diff --git a/python/lib/scheduler/dmod/scheduler/scheduler.py b/python/lib/scheduler/dmod/scheduler/scheduler.py index f9c91d158..ebf7a4734 100644 --- a/python/lib/scheduler/dmod/scheduler/scheduler.py +++ b/python/lib/scheduler/dmod/scheduler/scheduler.py @@ -364,7 +364,7 @@ def _ds_names_helper(cls, job: 'Job', worker_index: int, category: DataCategory, return list(dataset_names) # TODO (later): once we get to dynamic/custom images (i.e., for arbitrary BMI modules), make sure this still works - def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: + def _generate_docker_cmd_args(self, job: 'Job', worker_index: int, primary_workers: Dict[str, int]) -> List[str]: """ Create the Docker "CMD" arguments list to be used to start all services that will perform this job. @@ -379,6 +379,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: The job to have worker Docker services started, with those services needing "CMD" arguments generated. worker_index : int The particular worker service index in question, which will have a specific set of data requirements. + primary_workers : Dict[str, int] + Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job. Returns ------- @@ -407,7 +409,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: "--job-id": str(job.job_id), "--worker-index": str(worker_index)} if isinstance(job.model_request, AbstractNgenRequest): - docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job, worker_index)) + docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job=job, worker_index=worker_index, + primary_workers=primary_workers)) # Finally, convert the args map to a list, with each "flag"/key immediately preceding its value args_as_list = [] @@ -417,7 +420,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: return args_as_list - def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -> Dict[str, str]: + def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int, + primary_workers: Dict[str, int]) -> Dict[str, str]: """ Prepare the specific Docker CMD arg applicable to Nextgen-based jobs @@ -432,6 +436,8 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) - The job to have worker Docker services started, with those services needing "CMD" arguments generated. worker_index : int The particular worker service index in question, which will have a specific set of data requirements. + primary_workers : Dict[str, int] + Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job. Returns ------- @@ -452,6 +458,7 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) - "--hydrofabric-dataset": self._ds_names_helper(job, worker_index, DataCategory.HYDROFABRIC, max_count=1)[0], "--config-dataset": self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, data_format=DataFormat.NGEN_JOB_COMPOSITE_CONFIG)[0], + "--primary-workers": ",".join(str(v) for _, v in primary_workers.items()) } if job.cpu_count > 1: @@ -710,14 +717,43 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]: secrets = [self.get_secret_reference(secret_name) for secret_name in ['object_store_exec_user_name', 'object_store_exec_user_passwd']] + # Decide (somewhat trivially) which worker per-host will be considered "primary" + # In particular, this node will handle processing any data that needs to be local on the host + # Also, always make worker 0 primary for its host + per_node_primary_workers = {job.allocations[0].hostname: 0} + for alloc_index in range(1, num_allocations): + if (hostname := job.allocations[alloc_index].hostname) not in per_node_primary_workers: + per_node_primary_workers[hostname] = alloc_index + for alloc_index in range(num_allocations): alloc = job.allocations[alloc_index] constraints_str = f"node.hostname == {alloc.hostname}" constraints = list(constraints_str.split("/")) - pattern = '{}:/dmod/datasets/{}/{}:rw' - mounts = [pattern.format(r.fulfilled_access_at, r.category.name.lower(), r.fulfilled_by) for r in - job.worker_data_requirements[alloc_index] if r.fulfilled_access_at is not None] + mounts = [] + for req in job.worker_data_requirements[alloc_index]: + # TODO: (later) not sure, but this seems like it may be problematic condition that requires exception + if req.fulfilled_access_at is None: + logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' " + f"has no value for `fulfilled_access_at`; skipping this {req.__class__.__name__} " + f"when assembling Docker service mounts.") + continue + cluster_volume = req.fulfilled_access_at + category_subdir = req.category.name.lower() + dataset_name = req.fulfilled_by + mounts.append(f"{cluster_volume}:/dmod/cluster_volumes/{category_subdir}/{dataset_name}") + + # Allow this to not have been set (for now at least), but log a warning + if req.needs_data_local is None: + logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' " + f"does not indicate explicitly whether the required data needs to be available " + f"locally during job execution.") + # For requirements that need local data, mount a local Docker volume for them + elif req.needs_data_local: + # TODO: (later/future) make sure something has checked to see that space is available on the nodes + local_volume = f"{dataset_name}_local_vol" + mounts.append(f"{local_volume}:/dmod/local_volumes/{category_subdir}/{dataset_name}") + #mounts.append('/local/model_as_a_service/docker_host_volumes/forcing_local:/dmod/datasets/forcing_local:rw') # Introduce a way to inject data access directly via env config, to potentially bypass things for testing bind_mount_from_env = getenv('DMOD_JOB_WORKER_HOST_MOUNT') @@ -744,9 +780,10 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]: service_params.capabilities_to_add = ['SYS_ADMIN'] #TODO check for proper service creation, return False if doesn't work - service = self.create_service(serviceParams=service_params, idx=alloc_index, - docker_cmd_args=self._generate_docker_cmd_args(job, alloc_index)) - + cmd_args = self._generate_docker_cmd_args(job=job, worker_index=alloc_index, + primary_workers=per_node_primary_workers) + service = self.create_service(serviceParams=service_params, idx=alloc_index, docker_cmd_args=cmd_args) + service_per_allocation.append(service) logging.info("\n")