Skip to content

Commit

Permalink
Rationalise discovery of available memory and CPUs (dials#2619)
Browse files Browse the repository at this point in the history
Rationalise the discovery of system resources (CPUs and available virtual memory) across DIALS, using tooling adapted from Dask and Dask Distributed.

Introduces dials.util.system.CPU_COUNT and dials.util.system.MEMORY_LIMIT.

In cases where there is a large memory footprint, do not resort to adding swap space into the total available memory to calculate the appropriate number of multiprocessing tasks.

---------

Co-authored-by: Jim Crist <jcrist@users.noreply.github.com>
Co-authored-by: Albert DeFusco <albert.defusco@me.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: crusaderky <crusaderky@gmail.com>
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Co-authored-by: Samantha Hughes <shughes-uk@users.noreply.github.com>
Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
Co-authored-by: Johan Olsson <johan@jmvo.se>
  • Loading branch information
9 people authored Feb 29, 2024
1 parent f18f8af commit 6372cee
Show file tree
Hide file tree
Showing 19 changed files with 196 additions and 312 deletions.
1 change: 1 addition & 0 deletions newsfragments/2619.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Do more accurate and reliable discovery of available CPUs and virtual memory across a range of different systems. This should make various DIALS tools, in particular ``dials.integrate``, function better in situations where system resources are fairly limited or DIALS requires a lot of memory.
4 changes: 2 additions & 2 deletions src/dials/algorithms/indexing/bravais_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
get_symop_correlation_coefficients,
)
from dials.util.log import LoggingContext
from dials.util.mp import available_cores
from dials.util.system import CPU_COUNT

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -212,7 +212,7 @@ def refined_settings_from_refined_triclinic(
"""

if params.nproc is libtbx.Auto:
params.nproc = available_cores()
params.nproc = CPU_COUNT

if params.refinement.reflections.outlier.algorithm in ("auto", libtbx.Auto):
if experiments[0].goniometer is None:
Expand Down
21 changes: 7 additions & 14 deletions src/dials/algorithms/integration/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
ProcessorFlat3D,
ProcessorSingle2D,
ProcessorStills,
assess_available_memory,
build_processor,
job,
)
Expand All @@ -39,6 +38,7 @@
from dials.util import Sorry, phil, pprint, tabulate
from dials.util.command_line import heading
from dials.util.report import Report
from dials.util.system import MEMORY_LIMIT
from dials_algorithms_integration_integrator_ext import (
Executor,
JobList,
Expand Down Expand Up @@ -999,7 +999,6 @@ def fit_profiles(self):

# Do profile modelling
if profile_fitting:

logger.info("=" * 80)
logger.info("")
logger.info(heading("Modelling reflection profiles"))
Expand All @@ -1019,7 +1018,6 @@ def fit_profiles(self):
"** Skipping profile modelling - no reference profiles given **"
)
else:

# Try to set up the validation
if self.params.profile.validation.number_of_partitions > 1:
n = len(reference)
Expand Down Expand Up @@ -1134,7 +1132,6 @@ def fit_profiles(self):

# If we have more than 1 fold then do the validation
if num_folds > 1:

# Create the data processor
executor = ProfileValidatorExecutor(
self.experiments, profile_fitter
Expand Down Expand Up @@ -1283,24 +1280,21 @@ def _run_processor(reflections):
if self.params.integration.mp.method != "multiprocessing":
self.reflections, time_info = _run_processor(self.reflections)
else:
# need to do a memory check and decide whether to split table
available_immediate, _, __ = assess_available_memory(
self.params.integration
)

#  here don't consider nproc as the processor will reduce nproc to 1
# if necessary, only want to split if we can't even process with
# nproc = 1
# Here, don't consider nproc as the processor will reduce nproc to 1 if
# necessary. Only want to split if we can't even process with nproc = 1

if self.params.integration.mp.n_subset_split:
tables = self.reflections.random_split(
self.params.integration.mp.n_subset_split
)
else:
# Need to do a memory check and decide whether to split table.
# Split if its size in memory exceeds the fraction of available memory
# specified by the PHIL parameter integration.block.max_memory_usage.
tables = _iterative_table_split(
[self.reflections],
self.experiments,
available_immediate,
MEMORY_LIMIT * self.params.integration.block.max_memory_usage,
)

if len(tables) == 1:
Expand Down Expand Up @@ -1527,7 +1521,6 @@ def integrate(self):

# Do profile modelling
if self.params.integration.profile.fitting:

logger.info("=" * 80)
logger.info("")
logger.info(heading("Modelling reflection profiles"))
Expand Down
24 changes: 7 additions & 17 deletions src/dials/algorithms/integration/parallel_integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import math
import pickle

import psutil

from libtbx import Auto

import dials.algorithms.integration
from dials.algorithms.integration.processor import NullTask, execute_parallel_task
from dials.array_family import flex
from dials.util import tabulate
from dials.util.mp import multi_node_parallel_map
from dials.util.system import MEMORY_LIMIT

# Need this import first because loads extension that parallel_integrator_ext
# relies on - it assumes the binding for EmpiricalProfileModeller exists
Expand Down Expand Up @@ -126,7 +125,6 @@ def create(experiments, params=None):
# Select the factory function
selection = params.integration.background.algorithm
if selection == "simple":

# Get parameters
params = params.integration.background.simple

Expand Down Expand Up @@ -159,7 +157,6 @@ def create(experiments, params=None):
algorithm = SimpleBackgroundCalculatorFactory.create(experiments, **kwargs)

elif selection == "glm":

# Get the parameters
params = params.integration.background.glm

Expand All @@ -172,7 +169,6 @@ def create(experiments, params=None):
)

elif selection == "gmodel":

# Get the parameters
params = params.integration.background.gmodel

Expand Down Expand Up @@ -214,7 +210,6 @@ def create(experiments, reference_profiles, params=None):
# Select the factory function
selection = params.profile.algorithm
if selection == "gaussian_rs":

# Get the parameters
params = params.profile.gaussian_rs.fitting

Expand Down Expand Up @@ -263,7 +258,6 @@ def create(experiments, params=None):
# Select the factory function
selection = params.profile.algorithm
if selection == "gaussian_rs":

# Get the parameters
params = params.profile.gaussian_rs.fitting

Expand All @@ -289,7 +283,7 @@ def _assert_enough_memory(required_memory, max_memory_usage):
:param required_memory: The required number of bytes
:param max_memory_usage: The maximum memory usage allowed
"""
total_memory = psutil.virtual_memory().total
total_memory = MEMORY_LIMIT
assert max_memory_usage > 0.0, "maximum memory usage must be > 0"
assert max_memory_usage <= 1.0, "maximum memory usage must be <= 1"
limit_memory = total_memory * max_memory_usage
Expand All @@ -300,9 +294,9 @@ def _assert_enough_memory(required_memory, max_memory_usage):
include increasing the percentage of memory allowed for shoeboxes or
decreasing the block size. This could also be caused by a highly mosaic
crystal model - is your crystal really this mosaic?
Total system memory: {total_memory / 1000000000.0:.1f} GB
Limit image memory: {limit_memory / 1000000000.0:.1f} GB
Required image memory: {required_memory / 1000000000.0:.1f} GB
Total system memory: {total_memory / 1e9:.1f} GB
Limit image memory: {limit_memory / 1e9:.1f} GB
Required image memory: {required_memory / 1e9:.1f} GB
"""
)
else:
Expand Down Expand Up @@ -641,7 +635,7 @@ def compute_max_block_size(self):
"""
Compute the required memory
"""
total_memory = psutil.virtual_memory().available
total_memory = MEMORY_LIMIT
max_memory_usage = self.params.integration.block.max_memory_usage
assert max_memory_usage > 0.0, "maximum memory usage must be > 0"
assert max_memory_usage <= 1.0, "maximum memory usage must be <= 1"
Expand Down Expand Up @@ -1111,7 +1105,7 @@ def compute_max_block_size(self):
"""
Compute the required memory
"""
total_memory = psutil.virtual_memory().available
total_memory = MEMORY_LIMIT
max_memory_usage = self.params.integration.block.max_memory_usage
assert max_memory_usage > 0.0, "maximum memory usage must be > 0"
assert max_memory_usage <= 1.0, "maximum memory usage must be <= 1"
Expand Down Expand Up @@ -1247,7 +1241,6 @@ def __init__(self, experiments, reflections, params=None):

# Execute each task
if params.integration.mp.njobs > 1:

if params.integration.mp.method == "multiprocessing":
_assert_enough_memory(
params.integration.mp.njobs
Expand Down Expand Up @@ -1311,7 +1304,6 @@ def profiles(self):

class IntegratorProcessor:
def __init__(self, experiments, reflections, reference=None, params=None):

# Create the reference manager
integration_manager = IntegrationManager(
experiments, reflections, reference, params
Expand All @@ -1322,7 +1314,6 @@ def __init__(self, experiments, reflections, reference=None, params=None):

# Execute each task
if params.integration.mp.njobs > 1:

if params.integration.mp.method == "multiprocessing":
_assert_enough_memory(
params.integration.mp.njobs
Expand Down Expand Up @@ -1374,7 +1365,6 @@ def split_partials_over_boundaries(reflections, block_size):
# See if any reflections need to be split
refl_to_split_sel = n_frames_of_bboxes > block_size
if refl_to_split_sel.count(True) > 0:

# Get the subset of reflections to be split
subset = reflections.select(refl_to_split_sel)
newset = flex.reflection_table()
Expand Down
Loading

0 comments on commit 6372cee

Please sign in to comment.