diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py index 676d7fbc0..82a995682 100644 --- a/docker/main/ngen/py_funcs.py +++ b/docker/main/ngen/py_funcs.py @@ -1,16 +1,23 @@ #!/usr/bin/env python3 import argparse +import json import logging import os import shutil import tarfile +import concurrent.futures +import subprocess from datetime import datetime from enum import Enum from pathlib import Path from subprocess import Popen -from typing import Dict, List, Literal, Optional +from typing import Dict, List, Literal, Optional, Set, Tuple + + +def get_dmod_date_str_pattern() -> str: + return '%Y-%m-%d,%H:%M:%S' class ArchiveStrategy(Enum): @@ -56,6 +63,15 @@ def _subparse_move_to_directory(parent_subparser_container): sub_cmd_parser.add_argument("dest_dir", type=Path, help="Destination directory to which to move the output") +def _parse_for_make_data_local(parent_subparsers_container): + # A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container + desc = "If a primary worker, copy/extract to make dataset data locally available on physical node" + helper_cmd_parser = parent_subparsers_container.add_parser('make_data_local', description=desc) + helper_cmd_parser.add_argument('worker_index', type=int, help='The index of this particular worker.') + helper_cmd_parser.add_argument('primary_workers', type=lambda s: {int(i) for i in s.split(',')}, + help='Comma-delimited string of primary worker indices.') + + def _parse_for_move_job_output(parent_subparsers_container): # A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container desc = "Move output data files produced by a job to another location, typically to put them into a DMOD dataset." @@ -92,10 +108,91 @@ def _parse_args() -> argparse.Namespace: _parse_for_tar_and_copy(parent_subparsers_container=subparsers) _parse_for_gather_output(parent_subparsers_container=subparsers) _parse_for_move_job_output(parent_subparsers_container=subparsers) + _parse_for_make_data_local(parent_subparsers_container=subparsers) return parser.parse_args() +def _get_serial_dataset_dict(serialized_ds_file: Path) -> dict: + with serialized_ds_file.open() as s_file: + return json.loads(s_file.read()) + + +def _make_dataset_dir_local(local_data_dir: Path, do_optimized_object_store_copy: bool): + """ + Make the data in corresponding remotely-backed dataset directory local by placing in a local directory. + + Make a local, optimized copy of data from a dataset, where the data is also locally accessible/mounted but actually + stored elsewhere, making it less optimal for use by the worker. + + Function examines the serialized dataset file of the source directory and, if already present (indicating this dest + directory has been set up before), the analogous serialized dataset file in the destination directory. First, if + there is `dest_dir` version of this file, and if it has the same ``last_updated`` value, the function considers the + `dest_dir` contents to already be in sync with the `src_dir` simply returns. Second, it examines whether archiving + was used for the entire dataset, and then either extracts or copies data to the local volume as appropriate. + + Note that the function does alter the name of the serialized dataset file on the `dest_dir` side, primarily as an + indication that this is meant as a local copy of data, but not a full-fledge DMOD dataset. It also allows for a + final deliberate step of renaming (or copying with a different name) this file, which ensures the checked + ``last_updated`` value on the `dest_dir` side will have not been updated before a successful sync of the actual data + was completed. + + Parameters + ---------- + local_data_dir + Storage directory, locally available on this worker's host node, in which to copy/extract data. + do_optimized_object_store_copy + Whether to do an optimized copy for object store dataset data using the MinIO client (via a subprocess call). + """ + # TODO: (later) eventually source several details of this this from other part of the code + + dataset_vol_dir = get_cluster_volumes_root_directory().joinpath(local_data_dir.parent).joinpath(local_data_dir.name) + + local_serial_file = local_data_dir.joinpath(".ds_serial_state.json") + dataset_serial_file = dataset_vol_dir.joinpath(f"{dataset_vol_dir.name}_serialized.json") + + # Both should exist + if not dataset_vol_dir.is_dir(): + raise RuntimeError(f"Can't make data local from dataset mount path '{dataset_vol_dir!s}': not a directory") + elif not local_data_dir.is_dir(): + raise RuntimeError(f"Can't make data from '{dataset_vol_dir!s}' local: '{local_data_dir!s}' is not a directory") + # Also, dataset dir should not be empty + elif len([f for f in dataset_vol_dir.glob("*")]) == 0: + raise RuntimeError(f"Can't make data local from '{dataset_vol_dir!s}' local because it is empty") + + serial_ds_dict = _get_serial_dataset_dict(dataset_serial_file) + + # If dest_dir is not brand new and has something in it, check to make sure it isn't already as it needs to be + if local_serial_file.exists(): + prev_ds_dict = _get_serial_dataset_dict(local_serial_file) + current_last_updated = datetime.strptime(serial_ds_dict["last_updated"], get_dmod_date_str_pattern()) + prev_last_updated = datetime.strptime(prev_ds_dict["last_updated"], get_dmod_date_str_pattern()) + if prev_last_updated == current_last_updated: + logging.info(f"'{local_data_dir!s}' already shows most recent 'last_updated'; skipping redundant copy") + return + + # Determine if need to extract + if serial_ds_dict.get("data_archiving", True): + # Identify and extract archive + src_archive_file = [f for f in dataset_vol_dir.glob(f"{dataset_vol_dir.name}_archived*")][0] + archive_file = local_data_dir.joinpath(src_archive_file.name) + shutil.copy2(src_archive_file, archive_file) + shutil.unpack_archive(archive_file, local_data_dir) + archive_file.unlink(missing_ok=True) + # Also manually copy serialized state file (do last) + shutil.copy2(dataset_serial_file, local_serial_file) + # Need to optimize by using minio client directly here when dealing with OBJECT_STORE dataset, or will take 10x time + # TODO: (later) this is a bit of a hack, though a necessary one; find a way to integrate more elegantly + elif do_optimized_object_store_copy and serial_ds_dict["type"] == "OBJECT_STORE": + subprocess.run(["mc", "cp", "--config-dir", "/dmod/.mc", "-r", f"minio/{local_data_dir.name}/", f"{local_data_dir}/."]) + else: + # Otherwise copy contents + shutil.copy2(dataset_vol_dir, local_data_dir) + # Rename the copied serialized state file in the copy as needed + # But do this last to confirm directory contents are never more up-to-date with last_updated than expected + local_data_dir.joinpath(dataset_serial_file.name).rename(local_serial_file) + + def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[str] = None): """ Move source data files from their initial directory to a different directory, potentially combining into an archive. @@ -124,6 +221,14 @@ def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[ shutil.move(p, dest_dir) +def _parse_docker_secret(secret_name: str) -> str: + return Path("/run/secrets", secret_name).read_text().strip() + + +def _parse_object_store_secrets() -> Tuple[str, str]: + return _parse_docker_secret('object_store_exec_user_name'), _parse_docker_secret('object_store_exec_user_passwd') + + def gather_output(mpi_host_names: List[str], output_write_dir: Path): """ Using subprocesses, gather output from remote MPI hosts and collect in the analogous directory on this host. @@ -151,6 +256,95 @@ def gather_output(mpi_host_names: List[str], output_write_dir: Path): f"{error_in_bytes.decode()}") +def get_cluster_volumes_root_directory() -> Path: + """ + Get the root directory for cluster volumes (i.e., backed by dataset directly, synced cluster-wide) on this worker. + + Returns + ------- + Path + The root directory for cluster volumes on this worker. + """ + return Path("/dmod/cluster_volumes") + + +def get_local_volumes_root_directory() -> Path: + """ + Get the root directory for local volumes (i.e., local to physical node, share by all node's workers) on this worker. + + Returns + ------- + Path + The root directory for local volumes on this worker. + """ + return Path("/dmod/local_volumes") + + +def make_data_local(worker_index: int, primary_workers: Set[int]): + """ + Make data local for each local volume mount that exists, but only if this worker is a primary. + + Copy or extract data from mounted volumes/directories directly backed by DMOD datasets (i.e., "cluster volumes") to + corresponding directories local to the physical node (i.e. "local volumes"), for any such local directories found to + exist. An important distinction is that a local volume is local to the physical node and not the worker itself, and + thus is shared by all workers on that node. As such, return immediately without performing any actions if this + worker is not considered a "primary" worker, so that only one worker per node manipulates data. + + Function (for a primary worker) iterates through the local volume subdirectory paths to see if any local volumes + were set up when the worker was created. For any that are found, the function ensures data from the corresponding, + dataset-backed cluster volume directory is replicated in the local volume directory. + + Parameters + ---------- + worker_index + This worker's index. + primary_workers + Indices of designated primary workers + + See Also + -------- + _make_dataset_dir_local + """ + if worker_index not in primary_workers: + return + + cluster_vol_dir = get_cluster_volumes_root_directory() + local_vol_dir = get_local_volumes_root_directory() + expected_subdirs = {"config", "forcing", "hydrofabric", "observation", "output"} + + if not cluster_vol_dir.exists(): + raise RuntimeError(f"Can't make data local: cluster volume root '{cluster_vol_dir!s}' does not exist") + if not cluster_vol_dir.is_dir(): + raise RuntimeError(f"Can't make data local: cluster volume root '{cluster_vol_dir!s}' is not a directory") + if not local_vol_dir.exists(): + raise RuntimeError(f"Can't make data local: local volume root '{local_vol_dir!s}' does not exist") + if not local_vol_dir.is_dir(): + raise RuntimeError(f"Can't make data local: local volume root '{local_vol_dir!s}' is not a directory") + + try: + obj_store_access_key, obj_store_secret_key = _parse_object_store_secrets() + mc_ls_result = subprocess.run(["mc", "alias", "--config-dir", "/dmod/.mc", "ls", "minio"]) + if mc_ls_result.returncode != 0: + subprocess.run(["mc", "alias", "--config-dir", "/dmod/.mc", "set", "minio", obj_store_access_key, obj_store_secret_key]) + do_optimized_object_store_copy = True + except Exception as e: + logging.warning(f"Unable to parse secrets for optimized MinIO local data copying: {e!s}") + do_optimized_object_store_copy = False + + # Use some multi-threading here since this is IO heavy + futures = set() + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool: + for type_dir in (td for td in local_vol_dir.glob("*") if td.is_dir()): + if not cluster_vol_dir.joinpath(type_dir.name).is_dir(): + raise RuntimeError(f"Directory '{type_dir!s}' does not have analog in '{cluster_vol_dir!s}'") + if type_dir.name not in expected_subdirs: + logging.warning(f"Found unexpected (but matching) local volume data type subdirectory {type_dir.name}") + for local_ds_dir in (d for d in type_dir.glob("*") if d.is_dir()): + futures.add(pool.submit(_make_dataset_dir_local, local_ds_dir, do_optimized_object_store_copy)) + for future in futures: + future.result() + + def get_date_str() -> str: """ Get the current date and time as a string with format ``%Y-%m-%d,%H:%M:%S`` @@ -159,7 +353,7 @@ def get_date_str() -> str: ------- The current date and time as a string. """ - return datetime.now().strftime('%Y-%m-%d,%H:%M:%S') + return datetime.now().strftime(get_dmod_date_str_pattern()) def move_job_output(output_directory: Path, move_action: str, archiving: ArchiveStrategy = ArchiveStrategy.DYNAMIC, @@ -314,6 +508,8 @@ def main(): gather_output(mpi_host_names=[h for h in mpi_host_to_nproc_map], output_write_dir=args.output_write_dir) elif args.command == 'move_job_output': move_job_output(**(vars(args))) + elif args.command == 'make_data_local': + make_data_local(**(vars(args))) else: raise RuntimeError(f"Command arg '{args.command}' doesn't match a command supported by module's main function")