Skip to content

Commit

Permalink
Merge pull request #1059 from opencobra/refactor/sampling-optgp
Browse files Browse the repository at this point in the history
refactor: cobra.sampling.optgp
  • Loading branch information
synchon authored Mar 22, 2021
2 parents 0fd39ab + d4904a2 commit 0f5ab4e
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 90 deletions.
147 changes: 75 additions & 72 deletions src/cobra/sampling/optgp.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# -*- coding: utf-8 -*-

"""Provide OptGP sampler."""

from __future__ import absolute_import, division
"""Provide the OptGP sampler class and helper functions."""

from multiprocessing import Pool
from typing import TYPE_CHECKING, Dict, Optional, Tuple

import numpy as np
import pandas
import pandas as pd

from ..core.configuration import Configuration
from .core import step
from .hr_sampler import HRSampler, shared_np_array


from cobra.core.configuration import Configuration
from cobra.sampling.core import step
from cobra.sampling.hr_sampler import HRSampler, shared_np_array
if TYPE_CHECKING:
from cobra import Model
from cobra.sampling import OptGPSampler


__all__ = ("OptGPSampler",)
Expand All @@ -20,18 +22,18 @@
CONFIGURATION = Configuration()


def mp_init(obj):
def mp_init(obj: "OptGPSampler") -> None:
"""Initialize the multiprocessing pool."""
global sampler
sampler = obj


# Unfortunately this has to be outside the class to be usable with
# multiprocessing :()
def _sample_chain(args):
def _sample_chain(args: Tuple[int, int]) -> Tuple[int, "OptGPSampler"]:
"""Sample a single chain for OptGPSampler.
center and n_samples are updated locally and forgotten afterwards.
`center` and `n_samples` are updated locally and forgotten afterwards.
"""
n, idx = args # has to be this way to work in Python 2.7
Expand Down Expand Up @@ -67,59 +69,52 @@ def _sample_chain(args):


class OptGPSampler(HRSampler):
"""A parallel optimized sampler.
"""
Improved Artificial Centering Hit-and-Run sampler.
A parallel sampler with fast convergence and parallel execution. See [1]_
for details.
A parallel sampler with fast convergence and parallel execution.
See [1]_ for details.
Parameters
----------
model : cobra.Model
The cobra model from which to generate samples.
processes: int, optional (default Configuration.processes)
The number of processes used during sampling.
processes: int, optional
The number of processes used during sampling
(default cobra.Configuration.processes).
thinning : int, optional
The thinning factor of the generated sampling chain. A thinning of 10
means samples are returned every 10 steps.
The thinning factor of the generated sampling chain. A thinning of
10 means samples are returned every 10 steps (default 100).
nproj : int > 0, optional
How often to reproject the sampling point into the feasibility space.
Avoids numerical issues at the cost of lower sampling. If you observe
many equality constraint violations with `sampler.validate` you should
lower this number.
How often to reproject the sampling point into the feasibility
space. Avoids numerical issues at the cost of lower sampling. If
you observe many equality constraint violations with
`sampler.validate` you should lower this number (default None).
seed : int > 0, optional
Sets the random number seed. Initialized to the current time stamp if
None.
Sets the random number seed. Initialized to the current time stamp
if None (default None).
Attributes
----------
model : cobra.Model
The cobra model from which the samples get generated.
thinning : int
The currently used thinning factor.
n_samples : int
The total number of samples that have been generated by this
sampler instance.
problem : collections.namedtuple
A python object whose attributes define the entire sampling problem in
matrix form. See docstring of `Problem`.
problem : typing.NamedTuple
A NamedTuple whose attributes define the entire sampling problem in
matrix form.
warmup : numpy.matrix
A matrix of with as many columns as reactions in the model and more
than 3 rows containing a warmup sample in each row. None if no warmup
points have been generated yet.
A numpy matrix with as many columns as reactions in the model and
more than 3 rows containing a warmup sample in each row. None if no
warmup points have been generated yet.
retries : int
The overall of sampling retries the sampler has observed. Larger
values indicate numerical instabilities.
seed : int > 0, optional
Sets the random number seed. Initialized to the current time stamp if
None.
nproj : int
How often to reproject the sampling point into the feasibility space.
fwd_idx : numpy.array
Has one entry for each reaction in the model containing the index of
the respective forward variable.
A numpy array having one entry for each reaction in the model,
containing the index of the respective forward variable.
rev_idx : numpy.array
Has one entry for each reaction in the model containing the index of
the respective reverse variable.
A numpy array having one entry for each reaction in the model,
containing the index of the respective reverse variable.
prev : numpy.array
The current/last flux sample generated.
center : numpy.array
Expand All @@ -129,20 +124,20 @@ class OptGPSampler(HRSampler):
Notes
-----
The sampler is very similar to artificial centering where each process
samples its own chain. Initial points are chosen randomly from the warmup
points followed by a linear transformation that pulls the points a little
bit towards the center of the sampling space.
samples its own chain. Initial points are chosen randomly from the
warmup points followed by a linear transformation that pulls the points
a little bit towards the center of the sampling space.
If the number of processes used is larger than the one requested,
number of samples is adjusted to the smallest multiple of the number of
processes larger than the requested sample number. For instance, if you
have 3 processes and request 8 samples you will receive 9.
have 3 processes and request 8 samples, you will receive 9.
Memory usage is roughly in the order of (2 * number reactions)^2
due to the required nullspace matrices and warmup points. So large
models easily take up a few GB of RAM. However, most of the large matrices
are kept in shared memory. So the RAM usage is independent of the number
of processes.
Memory usage is roughly in the order of (2 * number of reactions)^2
due to the required nullspace matrices and warmup points. So, large
models easily take up a few GBs of RAM. However, most of the large
matrices are kept in shared memory. So the RAM usage is independent of
the number of processes.
References
----------
Expand All @@ -154,9 +149,17 @@ class OptGPSampler(HRSampler):
"""

def __init__(self, model, processes=None, thinning=100, nproj=None, seed=None):
def __init__(
self,
model: "Model",
thinning: int = 100,
processes: Optional[int] = None,
nproj: Optional[int] = None,
seed: Optional[int] = None,
**kwargs
) -> None:
"""Initialize a new OptGPSampler."""
super(OptGPSampler, self).__init__(model, thinning, seed=seed)
super().__init__(model, thinning, nproj=nproj, seed=seed, *kwargs)
self.generate_fva_warmup()

if processes is None:
Expand All @@ -170,37 +173,37 @@ def __init__(self, model, processes=None, thinning=100, nproj=None, seed=None):
(len(self.model.variables),), self.warmup.mean(axis=0)
)

def sample(self, n, fluxes=True):
def sample(self, n: int, fluxes: bool = True) -> pd.DataFrame:
"""Generate a set of samples.
This is the basic sampling function for all hit-and-run samplers.
Parameters
----------
n : int
The minimum number of samples that are generated at once
(see Notes).
fluxes : boolean
Whether to return fluxes or the internal solver variables. If set
to False will return a variable for each forward and backward flux
as well as all additional variables you might have defined in the
model.
The minimum number of samples that are generated at once.
fluxes : bool, optional
Whether to return fluxes or the internal solver variables. If
set to False, will return a variable for each forward and
backward flux as well as all additional variables you might
have defined in the model (default True).
Returns
-------
numpy.matrix
Returns a matrix with `n` rows, each containing a flux sample.
pandas.DataFrame
Returns a pandas DataFrame with `n` rows, each containing a
flux sample.
Notes
-----
Performance of this function linearly depends on the number
of reactions in your model and the thinning factor.
If the number of processes is larger than one, computation is split
across as the CPUs of your machine. This may shorten computation time.
However, there is also overhead in setting up parallel computation so
we recommend to calculate large numbers of samples at once
(`n` > 1000).
across the CPU cores of your machine. This may shorten computation
time. However, there is also overhead in setting up parallel
computation primitives so, we recommend to calculate large numbers
of samples at once (`n` > 1000).
"""
if self.processes > 1:
Expand Down Expand Up @@ -234,17 +237,17 @@ def sample(self, n, fluxes=True):
if fluxes:
names = [r.id for r in self.model.reactions]

return pandas.DataFrame(
return pd.DataFrame(
chains[:, self.fwd_idx] - chains[:, self.rev_idx],
columns=names,
)
else:
names = [v.name for v in self.model.variables]

return pandas.DataFrame(chains, columns=names)
return pd.DataFrame(chains, columns=names)

# Models can be large so don't pass them around during multiprocessing
def __getstate__(self):
def __getstate__(self) -> Dict:
"""Return the object for serialization."""
d = dict(self.__dict__)
del d["model"]
Expand Down
2 changes: 1 addition & 1 deletion src/cobra/sampling/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def sample(model, n, method="optgp", thinning=100, processes=1, seed=None):
"""

if method == "optgp":
sampler = OptGPSampler(model, processes, thinning=thinning, seed=seed)
sampler = OptGPSampler(model, processes=processes, thinning=thinning, seed=seed)
elif method == "achr":
sampler = ACHRSampler(model, thinning=thinning, seed=seed)
else:
Expand Down
30 changes: 13 additions & 17 deletions src/cobra/test/test_sampling/test_optgp.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,61 @@
# -*- coding: utf-8 -*-

"""Test functionalities of OptGPSampler."""

from __future__ import absolute_import
from typing import TYPE_CHECKING

import numpy as np
import pytest

from cobra.sampling import OptGPSampler


if TYPE_CHECKING:
from cobra import Model
from cobra.sampling import ACHRSampler


@pytest.fixture(scope="function")
def optgp(model):
def optgp(model: "Model") -> OptGPSampler:
"""Return OptGPSampler instance for tests."""

sampler = OptGPSampler(model, processes=1, thinning=1)
assert (sampler.n_warmup > 0) and (sampler.n_warmup <= 2 * len(model.variables))
assert all(sampler.validate(sampler.warmup) == "v")

return sampler


def test_optgp_init_benchmark(model, benchmark):
def test_optgp_init_benchmark(model: "Model", benchmark) -> None:
"""Benchmark inital OptGP sampling."""

benchmark(lambda: OptGPSampler(model, processes=2))


def test_optgp_sample_benchmark(optgp, benchmark):
def test_optgp_sample_benchmark(optgp: "Model", benchmark) -> None:
"""Benchmark OptGP sampling."""

benchmark(optgp.sample, 1)


def test_sampling(optgp):
def test_sampling(optgp: OptGPSampler) -> None:
"""Test sampling."""

s = optgp.sample(10)
assert all(optgp.validate(s) == "v")


def test_batch_sampling(optgp):
def test_batch_sampling(optgp: OptGPSampler) -> None:
"""Test batch sampling."""

for b in optgp.batch(5, 4):
assert all(optgp.validate(b) == "v")


def test_variables_samples(achr, optgp):
def test_variables_samples(achr: "ACHRSampler", optgp: OptGPSampler) -> None:
"""Test variable samples."""

vnames = np.array([v.name for v in achr.model.variables])
s = optgp.sample(10, fluxes=False)
assert s.shape == (10, optgp.warmup.shape[1])
assert (s.columns == vnames).all()
assert (optgp.validate(s) == "v").all()


def test_reproject(optgp):
def test_reproject(optgp: OptGPSampler) -> None:
"""Test reprojection of sampling."""

s = optgp.sample(10, fluxes=False).values
proj = np.apply_along_axis(optgp._reproject, 1, s)
assert all(optgp.validate(proj) == "v")
Expand Down

0 comments on commit 0f5ab4e

Please sign in to comment.