From e7b23911d15dd8a017ee3f5f6d11f178802a3ccb Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 5 Jul 2024 14:04:07 -0400 Subject: [PATCH] Update Launcher for using local data volumes. Updating Launcher to prepare services with local volume mounts when some data requirements must be fulfilled by local data on the physical node, and to update the relevant other args for starting worker services so that one worker on each node makes sure data gets prepared in local volumes as needed as part of job startup. --- .../lib/scheduler/dmod/scheduler/scheduler.py | 55 ++++++++++++++++--- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/python/lib/scheduler/dmod/scheduler/scheduler.py b/python/lib/scheduler/dmod/scheduler/scheduler.py index f9c91d158..ebf7a4734 100644 --- a/python/lib/scheduler/dmod/scheduler/scheduler.py +++ b/python/lib/scheduler/dmod/scheduler/scheduler.py @@ -364,7 +364,7 @@ def _ds_names_helper(cls, job: 'Job', worker_index: int, category: DataCategory, return list(dataset_names) # TODO (later): once we get to dynamic/custom images (i.e., for arbitrary BMI modules), make sure this still works - def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: + def _generate_docker_cmd_args(self, job: 'Job', worker_index: int, primary_workers: Dict[str, int]) -> List[str]: """ Create the Docker "CMD" arguments list to be used to start all services that will perform this job. @@ -379,6 +379,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: The job to have worker Docker services started, with those services needing "CMD" arguments generated. worker_index : int The particular worker service index in question, which will have a specific set of data requirements. + primary_workers : Dict[str, int] + Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job. Returns ------- @@ -407,7 +409,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: "--job-id": str(job.job_id), "--worker-index": str(worker_index)} if isinstance(job.model_request, AbstractNgenRequest): - docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job, worker_index)) + docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job=job, worker_index=worker_index, + primary_workers=primary_workers)) # Finally, convert the args map to a list, with each "flag"/key immediately preceding its value args_as_list = [] @@ -417,7 +420,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: return args_as_list - def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -> Dict[str, str]: + def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int, + primary_workers: Dict[str, int]) -> Dict[str, str]: """ Prepare the specific Docker CMD arg applicable to Nextgen-based jobs @@ -432,6 +436,8 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) - The job to have worker Docker services started, with those services needing "CMD" arguments generated. worker_index : int The particular worker service index in question, which will have a specific set of data requirements. + primary_workers : Dict[str, int] + Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job. Returns ------- @@ -452,6 +458,7 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) - "--hydrofabric-dataset": self._ds_names_helper(job, worker_index, DataCategory.HYDROFABRIC, max_count=1)[0], "--config-dataset": self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, data_format=DataFormat.NGEN_JOB_COMPOSITE_CONFIG)[0], + "--primary-workers": ",".join(str(v) for _, v in primary_workers.items()) } if job.cpu_count > 1: @@ -710,14 +717,43 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]: secrets = [self.get_secret_reference(secret_name) for secret_name in ['object_store_exec_user_name', 'object_store_exec_user_passwd']] + # Decide (somewhat trivially) which worker per-host will be considered "primary" + # In particular, this node will handle processing any data that needs to be local on the host + # Also, always make worker 0 primary for its host + per_node_primary_workers = {job.allocations[0].hostname: 0} + for alloc_index in range(1, num_allocations): + if (hostname := job.allocations[alloc_index].hostname) not in per_node_primary_workers: + per_node_primary_workers[hostname] = alloc_index + for alloc_index in range(num_allocations): alloc = job.allocations[alloc_index] constraints_str = f"node.hostname == {alloc.hostname}" constraints = list(constraints_str.split("/")) - pattern = '{}:/dmod/datasets/{}/{}:rw' - mounts = [pattern.format(r.fulfilled_access_at, r.category.name.lower(), r.fulfilled_by) for r in - job.worker_data_requirements[alloc_index] if r.fulfilled_access_at is not None] + mounts = [] + for req in job.worker_data_requirements[alloc_index]: + # TODO: (later) not sure, but this seems like it may be problematic condition that requires exception + if req.fulfilled_access_at is None: + logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' " + f"has no value for `fulfilled_access_at`; skipping this {req.__class__.__name__} " + f"when assembling Docker service mounts.") + continue + cluster_volume = req.fulfilled_access_at + category_subdir = req.category.name.lower() + dataset_name = req.fulfilled_by + mounts.append(f"{cluster_volume}:/dmod/cluster_volumes/{category_subdir}/{dataset_name}") + + # Allow this to not have been set (for now at least), but log a warning + if req.needs_data_local is None: + logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' " + f"does not indicate explicitly whether the required data needs to be available " + f"locally during job execution.") + # For requirements that need local data, mount a local Docker volume for them + elif req.needs_data_local: + # TODO: (later/future) make sure something has checked to see that space is available on the nodes + local_volume = f"{dataset_name}_local_vol" + mounts.append(f"{local_volume}:/dmod/local_volumes/{category_subdir}/{dataset_name}") + #mounts.append('/local/model_as_a_service/docker_host_volumes/forcing_local:/dmod/datasets/forcing_local:rw') # Introduce a way to inject data access directly via env config, to potentially bypass things for testing bind_mount_from_env = getenv('DMOD_JOB_WORKER_HOST_MOUNT') @@ -744,9 +780,10 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]: service_params.capabilities_to_add = ['SYS_ADMIN'] #TODO check for proper service creation, return False if doesn't work - service = self.create_service(serviceParams=service_params, idx=alloc_index, - docker_cmd_args=self._generate_docker_cmd_args(job, alloc_index)) - + cmd_args = self._generate_docker_cmd_args(job=job, worker_index=alloc_index, + primary_workers=per_node_primary_workers) + service = self.create_service(serviceParams=service_params, idx=alloc_index, docker_cmd_args=cmd_args) + service_per_allocation.append(service) logging.info("\n")