Skip to content

Commit

Permalink
Update worker Python functions to make data local.
Browse files Browse the repository at this point in the history
Adding functionality to py_funcs.py to support making DMOD dataset data
local (not just be locally accessible from remote storage).
  • Loading branch information
robertbartel committed Jul 12, 2024
1 parent 75b8403 commit 4d254a2
Showing 1 changed file with 198 additions and 2 deletions.
200 changes: 198 additions & 2 deletions docker/main/ngen/py_funcs.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
#!/usr/bin/env python3

import argparse
import json
import logging
import os
import shutil
import tarfile
import concurrent.futures
import subprocess

from datetime import datetime
from enum import Enum
from pathlib import Path
from subprocess import Popen
from typing import Dict, List, Literal, Optional
from typing import Dict, List, Literal, Optional, Set, Tuple


def get_dmod_date_str_pattern() -> str:
return '%Y-%m-%d,%H:%M:%S'


class ArchiveStrategy(Enum):
Expand Down Expand Up @@ -56,6 +63,15 @@ def _subparse_move_to_directory(parent_subparser_container):
sub_cmd_parser.add_argument("dest_dir", type=Path, help="Destination directory to which to move the output")


def _parse_for_make_data_local(parent_subparsers_container):
# A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container
desc = "If a primary worker, copy/extract to make dataset data locally available on physical node"
helper_cmd_parser = parent_subparsers_container.add_parser('make_data_local', description=desc)
helper_cmd_parser.add_argument('worker_index', type=int, help='The index of this particular worker.')
helper_cmd_parser.add_argument('primary_workers', type=lambda s: {int(i) for i in s.split(',')},
help='Comma-delimited string of primary worker indices.')


def _parse_for_move_job_output(parent_subparsers_container):
# A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container
desc = "Move output data files produced by a job to another location, typically to put them into a DMOD dataset."
Expand Down Expand Up @@ -92,10 +108,91 @@ def _parse_args() -> argparse.Namespace:
_parse_for_tar_and_copy(parent_subparsers_container=subparsers)
_parse_for_gather_output(parent_subparsers_container=subparsers)
_parse_for_move_job_output(parent_subparsers_container=subparsers)
_parse_for_make_data_local(parent_subparsers_container=subparsers)

return parser.parse_args()


def _get_serial_dataset_dict(serialized_ds_file: Path) -> dict:
with serialized_ds_file.open() as s_file:
return json.loads(s_file.read())


def _make_dataset_dir_local(local_data_dir: Path, do_optimized_object_store_copy: bool):
"""
Make the data in corresponding remotely-backed dataset directory local by placing in a local directory.
Make a local, optimized copy of data from a dataset, where the data is also locally accessible/mounted but actually
stored elsewhere, making it less optimal for use by the worker.
Function examines the serialized dataset file of the source directory and, if already present (indicating this dest
directory has been set up before), the analogous serialized dataset file in the destination directory. First, if
there is `dest_dir` version of this file, and if it has the same ``last_updated`` value, the function considers the
`dest_dir` contents to already be in sync with the `src_dir` simply returns. Second, it examines whether archiving
was used for the entire dataset, and then either extracts or copies data to the local volume as appropriate.
Note that the function does alter the name of the serialized dataset file on the `dest_dir` side, primarily as an
indication that this is meant as a local copy of data, but not a full-fledge DMOD dataset. It also allows for a
final deliberate step of renaming (or copying with a different name) this file, which ensures the checked
``last_updated`` value on the `dest_dir` side will have not been updated before a successful sync of the actual data
was completed.
Parameters
----------
local_data_dir
Storage directory, locally available on this worker's host node, in which to copy/extract data.
do_optimized_object_store_copy
Whether to do an optimized copy for object store dataset data using the MinIO client (via a subprocess call).
"""
# TODO: (later) eventually source several details of this this from other part of the code

dataset_vol_dir = get_cluster_volumes_root_directory().joinpath(local_data_dir.parent).joinpath(local_data_dir.name)

local_serial_file = local_data_dir.joinpath(".ds_serial_state.json")
dataset_serial_file = dataset_vol_dir.joinpath(f"{dataset_vol_dir.name}_serialized.json")

# Both should exist
if not dataset_vol_dir.is_dir():
raise RuntimeError(f"Can't make data local from dataset mount path '{dataset_vol_dir!s}': not a directory")
elif not local_data_dir.is_dir():
raise RuntimeError(f"Can't make data from '{dataset_vol_dir!s}' local: '{local_data_dir!s}' is not a directory")
# Also, dataset dir should not be empty
elif len([f for f in dataset_vol_dir.glob("*")]) == 0:
raise RuntimeError(f"Can't make data local from '{dataset_vol_dir!s}' local because it is empty")

serial_ds_dict = _get_serial_dataset_dict(dataset_serial_file)

# If dest_dir is not brand new and has something in it, check to make sure it isn't already as it needs to be
if local_serial_file.exists():
prev_ds_dict = _get_serial_dataset_dict(local_serial_file)
current_last_updated = datetime.strptime(serial_ds_dict["last_updated"], get_dmod_date_str_pattern())
prev_last_updated = datetime.strptime(prev_ds_dict["last_updated"], get_dmod_date_str_pattern())
if prev_last_updated == current_last_updated:
logging.info(f"'{local_data_dir!s}' already shows most recent 'last_updated'; skipping redundant copy")
return

# Determine if need to extract
if serial_ds_dict.get("data_archiving", True):
# Identify and extract archive
src_archive_file = [f for f in dataset_vol_dir.glob(f"{dataset_vol_dir.name}_archived*")][0]
archive_file = local_data_dir.joinpath(src_archive_file.name)
shutil.copy2(src_archive_file, archive_file)
shutil.unpack_archive(archive_file, local_data_dir)
archive_file.unlink(missing_ok=True)
# Also manually copy serialized state file (do last)
shutil.copy2(dataset_serial_file, local_serial_file)
# Need to optimize by using minio client directly here when dealing with OBJECT_STORE dataset, or will take 10x time
# TODO: (later) this is a bit of a hack, though a necessary one; find a way to integrate more elegantly
elif do_optimized_object_store_copy and serial_ds_dict["type"] == "OBJECT_STORE":
subprocess.run(["mc", "cp", "--config-dir", "/dmod/.mc", "-r", f"minio/{local_data_dir.name}/", f"{local_data_dir}/."])
else:
# Otherwise copy contents
shutil.copy2(dataset_vol_dir, local_data_dir)
# Rename the copied serialized state file in the copy as needed
# But do this last to confirm directory contents are never more up-to-date with last_updated than expected
local_data_dir.joinpath(dataset_serial_file.name).rename(local_serial_file)


def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[str] = None):
"""
Move source data files from their initial directory to a different directory, potentially combining into an archive.
Expand Down Expand Up @@ -124,6 +221,14 @@ def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[
shutil.move(p, dest_dir)


def _parse_docker_secret(secret_name: str) -> str:
return Path("/run/secrets", secret_name).read_text().strip()


def _parse_object_store_secrets() -> Tuple[str, str]:
return _parse_docker_secret('object_store_exec_user_name'), _parse_docker_secret('object_store_exec_user_passwd')


def gather_output(mpi_host_names: List[str], output_write_dir: Path):
"""
Using subprocesses, gather output from remote MPI hosts and collect in the analogous directory on this host.
Expand Down Expand Up @@ -151,6 +256,95 @@ def gather_output(mpi_host_names: List[str], output_write_dir: Path):
f"{error_in_bytes.decode()}")


def get_cluster_volumes_root_directory() -> Path:
"""
Get the root directory for cluster volumes (i.e., backed by dataset directly, synced cluster-wide) on this worker.
Returns
-------
Path
The root directory for cluster volumes on this worker.
"""
return Path("/dmod/cluster_volumes")


def get_local_volumes_root_directory() -> Path:
"""
Get the root directory for local volumes (i.e., local to physical node, share by all node's workers) on this worker.
Returns
-------
Path
The root directory for local volumes on this worker.
"""
return Path("/dmod/local_volumes")


def make_data_local(worker_index: int, primary_workers: Set[int]):
"""
Make data local for each local volume mount that exists, but only if this worker is a primary.
Copy or extract data from mounted volumes/directories directly backed by DMOD datasets (i.e., "cluster volumes") to
corresponding directories local to the physical node (i.e. "local volumes"), for any such local directories found to
exist. An important distinction is that a local volume is local to the physical node and not the worker itself, and
thus is shared by all workers on that node. As such, return immediately without performing any actions if this
worker is not considered a "primary" worker, so that only one worker per node manipulates data.
Function (for a primary worker) iterates through the local volume subdirectory paths to see if any local volumes
were set up when the worker was created. For any that are found, the function ensures data from the corresponding,
dataset-backed cluster volume directory is replicated in the local volume directory.
Parameters
----------
worker_index
This worker's index.
primary_workers
Indices of designated primary workers
See Also
--------
_make_dataset_dir_local
"""
if worker_index not in primary_workers:
return

cluster_vol_dir = get_cluster_volumes_root_directory()
local_vol_dir = get_local_volumes_root_directory()
expected_subdirs = {"config", "forcing", "hydrofabric", "observation", "output"}

if not cluster_vol_dir.exists():
raise RuntimeError(f"Can't make data local: cluster volume root '{cluster_vol_dir!s}' does not exist")
if not cluster_vol_dir.is_dir():
raise RuntimeError(f"Can't make data local: cluster volume root '{cluster_vol_dir!s}' is not a directory")
if not local_vol_dir.exists():
raise RuntimeError(f"Can't make data local: local volume root '{local_vol_dir!s}' does not exist")
if not local_vol_dir.is_dir():
raise RuntimeError(f"Can't make data local: local volume root '{local_vol_dir!s}' is not a directory")

try:
obj_store_access_key, obj_store_secret_key = _parse_object_store_secrets()
mc_ls_result = subprocess.run(["mc", "alias", "--config-dir", "/dmod/.mc", "ls", "minio"])
if mc_ls_result.returncode != 0:
subprocess.run(["mc", "alias", "--config-dir", "/dmod/.mc", "set", "minio", obj_store_access_key, obj_store_secret_key])
do_optimized_object_store_copy = True
except Exception as e:
logging.warning(f"Unable to parse secrets for optimized MinIO local data copying: {e!s}")
do_optimized_object_store_copy = False

# Use some multi-threading here since this is IO heavy
futures = set()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool:
for type_dir in (td for td in local_vol_dir.glob("*") if td.is_dir()):
if not cluster_vol_dir.joinpath(type_dir.name).is_dir():
raise RuntimeError(f"Directory '{type_dir!s}' does not have analog in '{cluster_vol_dir!s}'")
if type_dir.name not in expected_subdirs:
logging.warning(f"Found unexpected (but matching) local volume data type subdirectory {type_dir.name}")
for local_ds_dir in (d for d in type_dir.glob("*") if d.is_dir()):
futures.add(pool.submit(_make_dataset_dir_local, local_ds_dir, do_optimized_object_store_copy))
for future in futures:
future.result()


def get_date_str() -> str:
"""
Get the current date and time as a string with format ``%Y-%m-%d,%H:%M:%S``
Expand All @@ -159,7 +353,7 @@ def get_date_str() -> str:
-------
The current date and time as a string.
"""
return datetime.now().strftime('%Y-%m-%d,%H:%M:%S')
return datetime.now().strftime(get_dmod_date_str_pattern())


def move_job_output(output_directory: Path, move_action: str, archiving: ArchiveStrategy = ArchiveStrategy.DYNAMIC,
Expand Down Expand Up @@ -314,6 +508,8 @@ def main():
gather_output(mpi_host_names=[h for h in mpi_host_to_nproc_map], output_write_dir=args.output_write_dir)
elif args.command == 'move_job_output':
move_job_output(**(vars(args)))
elif args.command == 'make_data_local':
make_data_local(**(vars(args)))
else:
raise RuntimeError(f"Command arg '{args.command}' doesn't match a command supported by module's main function")

Expand Down

0 comments on commit 4d254a2

Please sign in to comment.