diff --git a/docs/source/examples/MultiTaskTest.ipynb b/docs/source/examples/MultiTaskTest.ipynb index 937fd86..db58a8d 100644 --- a/docs/source/examples/MultiTaskTest.ipynb +++ b/docs/source/examples/MultiTaskTest.ipynb @@ -6,7 +6,7 @@ "metadata": {}, "source": [ "# Multi-Task Test\n", - "At first we have to install the newest version of fvgGP" + "At first we have to install the newest version of fvGP" ] }, { diff --git a/examples/MultiTaskTest.ipynb b/examples/MultiTaskTest.ipynb index dad3b4c..7007690 100644 --- a/examples/MultiTaskTest.ipynb +++ b/examples/MultiTaskTest.ipynb @@ -6,7 +6,7 @@ "metadata": {}, "source": [ "# Multi-Task Test\n", - "At first we have to install the newest version of fvgGP" + "At first we have to install the newest version of fvGP" ] }, { @@ -17,7 +17,7 @@ "outputs": [], "source": [ "##first install the newest version of fvgp\n", - "#!pip install fvgp==4.0.0" + "#!pip install fvgp==4.0.5" ] }, { @@ -177,12 +177,11 @@ " #gp_kernel_function=mkernel #what happens if comment this out? (adjust bounds below)\n", " )\n", "#change this based on kernel choice\n", - "hps_bounds = my_gp2.hps_bounds \n", "#hps_bounds = np.array([[0.001,10000.],[1.,1000.]])\n", "\n", "#my_gp2.update_gp_data(x_data3,y_data3)\n", "print(\"Global Training in progress\")\n", - "my_gp2.train(hps_bounds, max_iter = 200)" + "my_gp2.train(max_iter = 2)" ] }, { @@ -272,6 +271,14 @@ "fig.show()\n", "\n" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1694322d", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/examples/SingleTaskTest.ipynb b/examples/SingleTaskTest.ipynb index e637157..cc5d1a5 100644 --- a/examples/SingleTaskTest.ipynb +++ b/examples/SingleTaskTest.ipynb @@ -24,7 +24,7 @@ "outputs": [], "source": [ "##first install the newest version of fvgp\n", - "#!pip install fvgp==4.0.0" + "#!pip install fvgp==4.0.5" ] }, { @@ -215,7 +215,7 @@ "outputs": [], "source": [ "for i in range(10):\n", - " time.sleep(5)\n", + " time.sleep(2)\n", " my_gp1.update_hyperparameters(opt_obj)\n", " print(my_gp1.hyperparameters)\n", " print(\"\")" diff --git a/examples/gp2ScaleTest.ipynb b/examples/gp2ScaleTest.ipynb index 6fa7959..545e3f0 100644 --- a/examples/gp2ScaleTest.ipynb +++ b/examples/gp2ScaleTest.ipynb @@ -8,7 +8,7 @@ "# gp2Scale \n", "gp2Scale is a special setting in fvgp that combines non-stationary, compactly-supported kernels, HPC distributed computing, and sparse linear algebra to allow scale-up of exact GPs to millions of data points. Here we run a moderately-sized GP, just because we assume you might run this locally.\n", "\n", - "I hope it is clear how cool it is what is happening here. If you have a dask client that points to a remote cluster with 500 GPUs, you will distribute the covariance matrix computation across those. The full matrix is sparse will be fast to work with in downstream operations. The algorithm only makes use of naturally-occuring sparsity, so the result is exact in contrast to Vecchia or inducing-point methods." + "I hope it is clear how cool it is what is happening here. If you have a dask client that points to a remote cluster with 500 GPUs, you will distribute the covariance matrix computation across those. The full matrix is sparse and will be fast to work with in downstream operations. The algorithm only makes use of naturally-occuring sparsity, so the result is exact in contrast to Vecchia or inducing-point methods." ] }, { @@ -19,7 +19,7 @@ "outputs": [], "source": [ "##first install the newest version of fvgp\n", - "#!pip install fvgp==4.0.0" + "#!pip install fvgp==4.0.5" ] }, { diff --git a/examples/gp2Scale_example_HPC_jobscript.sl b/examples/gp2Scale_example_HPC_jobscript.sl index 99bb9f3..1f96e40 100644 --- a/examples/gp2Scale_example_HPC_jobscript.sl +++ b/examples/gp2Scale_example_HPC_jobscript.sl @@ -26,7 +26,7 @@ echo ${port} echo "starting scheduler" dask-scheduler --no-dashboard --no-show --host ${hn} --port ${port} & echo "starting workers" -srun -o dask_worker_info.txt dask-worker ${hn}:${port} & +srun -o dask_worker_info.txt dask-worker ${hn}:${port} --nthreads 1 & echo "starting gp2Scale" -python -u run_gp2ScaleGPU.py ${hn}:${port} ${number_of_workers} +python -u gp2Scale_example_HPC_RunScript.py ${hn}:${port} ${number_of_workers} diff --git a/fvgp/fvgp.py b/fvgp/fvgp.py index 777dab8..36c5ec5 100755 --- a/fvgp/fvgp.py +++ b/fvgp/fvgp.py @@ -68,6 +68,10 @@ class provides all the methods described for the GP class. fvgp.fvGP.gp_deep_kernel_layer_width. If you specify another kernel, please provide init_hyperparameters. + hyperparameter_bounds : np.ndarray, optional + A 2d numpy array of shape (N x 2), where N is the number of needed hyperparameters. + The default is None, in that case hyperparameter_bounds have to be specified + in the train calls or default bounds are used. Those only work for the default kernel. output_positions : np.ndarray, optional A 3-D numpy array of shape (U x output_number x output_dim), so that for each measurement position, the outputs are clearly defined by their positions in the output space. The default is np.array([[0],[1],[2],[3],...,[output_number - 1]]) for each @@ -204,6 +208,7 @@ def __init__( x_data, y_data, init_hyperparameters = None, + hyperparameter_bounds = None, output_positions = None, noise_variances = None, compute_device = "cpu", @@ -262,9 +267,9 @@ def __init__( self.hps_bounds[2:] = np.array([-1.,1.]) init_hps = np.random.uniform(low = self.hps_bounds[:,0], high = self.hps_bounds[:,1],size = len(self.hps_bounds)) warnings.warn("Hyperparameter bounds have been initialized automatically \ - \n for the default kernel in fvgp. They have to be provided to the training.\ - \n For instance: GP.train(hyperparameter_bounds = fvgp_obj_name.hps_bounds)\ + \n for the default kernel in fvgp. They will automatically used for the training.\ \n However, you can also define and provide new bounds.") + hyperparameter_bounds = self.hps_bounds ####init GP @@ -273,6 +278,7 @@ def __init__( x_data, y_data, init_hyperparameters = init_hps, + hyperparameter_bounds = hyperparameter_bounds, noise_variances = noise_variances, compute_device = compute_device, gp_kernel_function = gp_kernel_function, diff --git a/fvgp/gp.py b/fvgp/gp.py index 77fca57..ff079a5 100755 --- a/fvgp/gp.py +++ b/fvgp/gp.py @@ -9,6 +9,7 @@ from scipy.sparse import coo_matrix from scipy.sparse.linalg import splu from scipy.sparse.linalg import cg +from scipy.sparse.linalg import cgs from scipy.sparse.linalg import minres import dask.distributed as distributed import numpy as np @@ -22,9 +23,9 @@ from .gp2Scale import gp2Scale as gp2S from dask.distributed import Variable from dask.distributed import Client -from .sparse_matrix import gp2ScaleSparseMatrix from scipy.stats import norm from imate import logdet +import gc #TODO: @@ -57,6 +58,10 @@ class GP(): The default is an array of ones, with a shape appropriate for the default kernel (D + 1), which is an anisotropic Matern kernel with automatic relevance determination (ARD). + hyperparameter_bounds : np.ndarray, optional + A 2d numpy array of shape (N x 2), where N is the number of needed hyperparameters. + The default is None, in that case hyperparameter_bounds have to be specified + in the train calls or default bounds are used. Those only work for the default kernel. noise_variances : np.ndarray, optional An numpy array defining the uncertainties/noise in the data `y_data` in form of a point-wise variance. Shape (len(y_data), 1) or (len(y_data)). @@ -184,6 +189,7 @@ def __init__( x_data, y_data, init_hyperparameters = None, + hyperparameter_bounds = None, noise_variances = None, compute_device = "cpu", gp_kernel_function = None, @@ -232,6 +238,7 @@ def __init__( ########################################## if init_hyperparameters is None: init_hyperparameters = np.ones((input_space_dim + 1)) self.hyperparameters = init_hyperparameters + self.hyperparameter_bounds = hyperparameter_bounds ########################################## ##############preps for sparse mode####### @@ -469,10 +476,14 @@ def train(self, if init_hyperparameters is None: init_hyperparameters = np.array(self.hyperparameters) if hyperparameter_bounds is None: - warnings.warn("You have not provided hyperparameter bounds. Standard ones will be used but this might lead to suboptimal performance", stacklevel=2) - hyperparameter_bounds = np.zeros((len(init_hyperparameters),2)) - hyperparameter_bounds[0] = np.array([0.00001,1e8]) - hyperparameter_bounds[1:] = np.array([0.00001,1e8]) + if self.hyperparameter_bounds is None: + warnings.warn("You have not provided hyperparameter bounds. Standard ones will be used but this might lead to suboptimal performance", stacklevel=2) + hyperparameter_bounds = np.zeros((len(init_hyperparameters),2)) + hyperparameter_bounds[:] = np.array([0.00001,1e8]) + else: + hyperparameter_bounds = self.hyperparameter_bounds + + self.hyperparameters = self._optimize_log_likelihood( init_hyperparameters, np.array(hyperparameter_bounds), @@ -531,10 +542,12 @@ def train_async(self, if init_hyperparameters is None: init_hyperparameters = np.array(self.hyperparameters) if hyperparameter_bounds is None: - warnings.warn("You have not provided hyperparameter bounds. Standard ones will be used but this might lead to suboptimal performance", stacklevel=2) - hyperparameter_bounds = np.zeros((len(init_hyperparameters),2)) - hyperparameter_bounds[0] = np.array([0.00001,1e8]) - hyperparameter_bounds[1:] = np.array([0.00001,1e8]) + if self.hyperparameter_bounds is None: + warnings.warn("You have not provided hyperparameter bounds. Standard ones will be used but this might lead to suboptimal performance", stacklevel=2) + hyperparameter_bounds = np.zeros((len(init_hyperparameters),2)) + hyperparameter_bounds[:] = np.array([0.00001,1e8]) + else: + hyperparameter_bounds = self.hyperparameter_bounds opt_obj = self._optimize_log_likelihood_async( init_hyperparameters, @@ -946,12 +959,9 @@ def _compute_GPpriorV(self, x_data, y_data, hyperparameters, calc_inv = False): #get K if self.gp2Scale: st = time.time() - self.gp2Scale_obj.compute_covariance(hyperparameters, self.gp2Scale_dask_client) - if self.info: print("Computing the covariance matrix done after ",time.time()-st," seconds.", flush = True) - K = self.gp2Scale_obj.SparsePriorCovariance.get_result().result() - if self.info: print("Transferring the covariance matrix to host done after ",time.time()-st," seconds. sparsity = ", float(K.nnz)/float(len(x_data)**2) , flush = True) - self.gp2Scale_obj.SparsePriorCovariance.reset_prior().result() - if self.info: print("Reset the prior done.", flush = True) + K = self.gp2Scale_obj.compute_covariance(hyperparameters, self.gp2Scale_dask_client) + Ksparsity = float(K.nnz)/float(len(x_data)**2) + if self.info: print("Transferring the covariance matrix to host done after ",time.time()-st," seconds. sparsity = ", Ksparsity, flush = True) else: K = self._compute_K(hyperparameters) @@ -963,22 +973,41 @@ def _compute_GPpriorV(self, x_data, y_data, hyperparameters, calc_inv = False): #get Kinv/KVinvY, LU, Chol, logdet(KV) if self.gp2Scale: - if len(x_data) < 100000: - LU = splu(KV.tocsc()) - factorization_obj = ("LU", LU) - KVinvY = LU.solve(y_data - prior_mean_vec) - upper_diag = abs(LU.U.diagonal()) - KVlogdet = np.sum(np.log(upper_diag)) + #try fast but RAM intensive SuperLU first + if len(x_data) < 50000 and Ksparsity < 0.0001: + try: + LU = splu(KV.tocsc()) + factorization_obj = ("LU", LU) + KVinvY = LU.solve(y_data - prior_mean_vec) + upper_diag = abs(LU.U.diagonal()) + KVlogdet = np.sum(np.log(upper_diag)) + #if that did not work, do random lin algebra magic + except: + KVinvY, exit_code = minres(KV.tocsc(),y_data - prior_mean_vec) + factorization_obj = ("gp2Scale",None) + if exit_code != 0: + warnings.warn("CG solve not successful in gp2Scale. Trying MINRES") + KVinvY, exit_code = minres(KV.tocsc(),y_data - prior_mean_vec) + if self.compute_device == "gpu": gpu = True + else: gpu = False + KVlogdet, info_slq = logdet(KV, method='slq', min_num_samples=10, max_num_samples=100, + lanczos_degree=20, error_rtol=0.1, gpu = False, + return_info=True, plot=False, verbose=self.info) + #if the problem is large go with rand. lin. algebra straight away else: - KVinvY, exit_code = cg(KV.tocsc(),y_data - prior_mean_vec) + if self.info: print("MINRES solve in progress ...",time.time() - st,"seconds.") factorization_obj = ("gp2Scale",None) - if exit_code != 0: - warnings.warn("CG solve not successful in gp2Scale. Trying MINRES") - KVinvY, exit_code = minres(KV.tocsc(),y_data - prior_mean_vec) - KVlogdet, info_slq = logdet(KV, method='slq', min_num_samples=50, max_num_samples=200, - lanczos_degree=80, error_rtol=0.1, - return_info=True, plot=False, verbose=self.info) - if self.info: print("Solve and logdet/LU done after ",time.time() - st,"seconds.") + KVinvY, exit_code = minres(KV.tocsc(),y_data - prior_mean_vec) + if self.info: print("MINRES solve done after ",time.time() - st,"seconds.") + + if self.compute_device == "gpu": gpu = True + else: gpu = False + if self.info: print("logdet() in progress ... ",time.time() - st,"seconds.") + KVlogdet, info_slq = logdet(KV, method='slq', min_num_samples=10, max_num_samples=100, + lanczos_degree=20, error_rtol=0.1, orthogonalize=0, gpu = False, + return_info=True, plot=False, verbose=self.info) + if self.info: print("logdet/LU done after ",time.time() - st,"seconds.") + KVinv = None elif self.sparse_mode and self._is_sparse(KV): KV = csc_matrix(KV) @@ -1015,8 +1044,8 @@ def _KVsolve(self, b): res = np.empty((len(b),b.shape[1])) if b.shape[1]>100: warnings.warn("You want to predict at >100 points. \n When using gp2Scale, this takes a while. \n Better predict at only a handful of points.") for i in range(b.shape[1]): - res[:,i], exit_status = cg(self.KV,b[:,i]) - if exit_status != 0: res[:,i], exit_status = minres(self.KV,b[:,i]) + res[:,i], exit_status = minres(self.KV,b[:,i]) + #if exit_status != 0: res[:,i], exit_status = minres(self.KV,b[:,i]) return res def _logdet(self, A, factorization_obj = None): @@ -1168,9 +1197,9 @@ def posterior_mean(self, x_pred, hyperparameters = None, x_out = None): if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) - + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") k = self.kernel(x_data,x_pred,hps,self) A = k.T @ KVinvY @@ -1212,8 +1241,11 @@ def posterior_mean_grad(self, x_pred, hyperparameters = None, x_out = None, dire if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + + k = self.kernel(x_data,x_pred,hps,self) f = self.mean_function(x_pred,hps,self) @@ -1255,6 +1287,8 @@ def posterior_covariance(self, x_pred, x_out = None, variance_only = False, add_ If True the compuation of the posterior covariance matrix is avoided which can save compute time. In that case the return will only provide the variance at the input points. Default = False. + add_noise : bool, optional + If True the noise variances will be added to the posterior variances. Default = False. Return ------ solution dictionary : dict @@ -1263,8 +1297,9 @@ def posterior_covariance(self, x_pred, x_out = None, variance_only = False, add_ x_data = self.x_data.copy() if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") k = self.kernel(x_data,x_pred,self.hyperparameters,self) kk = self.kernel(x_pred, x_pred,self.hyperparameters,self) @@ -1320,8 +1355,9 @@ def posterior_covariance_grad(self, x_pred, x_out = None, direction = None): x_data = self.x_data.copy() if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") k = self.kernel(x_data,x_pred,self.hyperparameters,self) k_covariance_prod = self._KVsolve(k) @@ -1373,8 +1409,10 @@ def joint_gp_prior(self, x_pred, x_out = None): x_data, K, prior_mean_vec = self.x_data.copy(), self.K.copy(), self.prior_mean_vec.copy() if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + k = self.kernel(x_data,x_pred,self.hyperparameters,self) kk = self.kernel(x_pred, x_pred,self.hyperparameters,self) @@ -1407,8 +1445,11 @@ def joint_gp_prior_grad(self, x_pred, direction, x_out = None): x_data, K, prior_mean_vec = self.x_data.copy(), self.K.copy(), self.prior_mean_vec.copy() if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + + k = self.kernel(x_data,x_pred,self.hyperparameters,self) kk = self.kernel(x_pred, x_pred,self.hyperparameters,self) @@ -1465,8 +1506,10 @@ def gp_entropy(self, x_pred, x_out = None): """ if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + priors = self.joint_gp_prior(x_pred, x_out = None) S = priors["S"] @@ -1493,8 +1536,11 @@ def gp_entropy_grad(self, x_pred, direction, x_out = None): """ if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + + priors1 = self.joint_gp_prior(x_pred, x_out = None) priors2 = self.joint_gp_prior_grad(x_pred,direction, x_out = None) @@ -1576,8 +1622,10 @@ def gp_kl_div(self, x_pred, comp_mean, comp_cov, x_out = None): """ if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + res = self.posterior_mean(x_pred, x_out = None) gp_mean = res["f(x)"] @@ -1610,8 +1658,10 @@ def gp_kl_div_grad(self, x_pred, comp_mean, comp_cov, direction, x_out = None): """ if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + gp_mean = self.posterior_mean(x_pred, x_out = None)["f(x)"] gp_mean_grad = self.posterior_mean_grad(x_pred,direction = direction, x_out = None)["df/dx"] @@ -1649,6 +1699,9 @@ def gp_mutual_information(self,x_pred, x_out = None): """ Function to calculate the mutual information between the random variables f(x_data) and f(x_pred). + The mutual information is always positive, as it is a KL divergence, and is bounded + from below by 0. The maxima are expected at the data points. Zero is expected far from the + data support. Parameters ---------- x_pred : np.ndarray @@ -1662,13 +1715,16 @@ def gp_mutual_information(self,x_pred, x_out = None): solution dictionary : {} Information gain of collective points. """ - x_data, K = self.x_data.copy(), self.K.copy() + x_data, K = self.x_data.copy(), self.K.copy() + (np.identity(len(self.K)) * 1e-9) if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") - k = self.kernel(x_data,x_pred,self.hyperparameters,self) + + + k = self.kernel(x_data,x_pred,self.hyperparameters,self) kk = self.kernel(x_pred, x_pred,self.hyperparameters,self) + (np.identity(len(x_pred)) * 1e-9) @@ -1683,8 +1739,12 @@ def gp_total_correlation(self,x_pred, x_out = None): """ Function to calculate the interaction information between the random variables f(x_data) and f(x_pred). This is the mutual information - of each f(x_pred) with f(x_data). It is also called the MUltiinformation. - It best used when several prediction points are supposed to be mutually aware. + of each f(x_pred) with f(x_data). It is also called the Multiinformation. + It is best used when several prediction points are supposed to be mutually aware. + The total correlation is always positive, as it is a KL divergence, and is bounded + from below by 0. The maxima are expected at the data points. Zero is expected far from the + data support. + Parameters ---------- x_pred : np.ndarray @@ -1698,11 +1758,14 @@ def gp_total_correlation(self,x_pred, x_out = None): solution dictionary : {} Information gain of collective points. """ - x_data, K = self.x_data.copy(), self.K.copy() + x_data, K = self.x_data.copy(), self.K.copy() + (np.identity(len(self.K)) * 1e-9) if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + + k = self.kernel(x_data,x_pred,self.hyperparameters,self) kk = self.kernel(x_pred, x_pred,self.hyperparameters,self) + (np.identity(len(x_pred)) * 1e-9) @@ -1718,11 +1781,12 @@ def gp_total_correlation(self,x_pred, x_out = None): ########################################################################### def shannon_information_gain(self, x_pred, x_out = None): """ - Function to compute the shannon-information --- the predicted drop in entropy --- given + Function to compute the shannon-information --- a well-behaved function + of the predicted drop in entropy --- given a set of points. The shannon_information gain is a scalar, it is proportionate to the mutual infomation of the two random variables f(x_pred) and f(x_data). - The mutual information is always positive, as it is a KL divergence, and is bound - from below by 0. The maxima are expcetd at the data points. Zero is expected far from the + The mutual information is always positive, as it is a KL divergence, and is bounded + from below by 0. The maxima are expected at the data points. Zero is expected far from the data support. This shannon information gain is exp(-total correlation). Parameters ---------- @@ -1739,8 +1803,10 @@ def shannon_information_gain(self, x_pred, x_out = None): """ if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + return {"x": x_pred, "sig":np.exp(-self.gp_total_correlation(x_pred, x_out = None)["total correlation"])} @@ -1762,8 +1828,11 @@ def shannon_information_gain_vec(self, x_pred, x_out = None): """ if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + + sig = np.zeros((len(x_pred))) for i in range(len(x_pred)): @@ -1789,8 +1858,11 @@ def posterior_probability(self, x_pred, comp_mean, comp_cov, x_out = None): """ if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + + res = self.posterior_mean(x_pred, x_out = None) gp_mean = res["f(x)"] @@ -1829,8 +1901,10 @@ def posterior_probability_grad(self, x_pred, comp_mean, comp_cov, direction, x_o """ if isinstance(x_pred,np.ndarray): if np.ndim(x_pred) == 1: raise Exception("x_pred has to be a 2d numpy array, not 1d") + if x_out is not None: x_pred = self._cartesian_product_euclid(x_pred,x_out) if len(x_pred[0]) != self.input_space_dim: raise Exception("Wrong dimensionality of the input points x_pred.") - if x_out is not None: x_pred = self._cartesian_product(x_pred,x_out) + elif x_out is not None: raise Exception("Multi-task GPs on non-Euclidean spaces not implemented yet.") + x1 = np.array(x_pred) x2 = np.array(x_pred) @@ -2189,7 +2263,7 @@ def wendland_anisotropic(self,x1,x2, hps, obj): d = np.sqrt(distance_matrix) d[d > 1.] = 1. kernel = (1.-d)**8 * (35.*d**3 + 25.*d**2 + 8.*d + 1.) - return kernel + return hps[0] * kernel def non_stat_kernel(self,x1,x2,x0,w,l): @@ -2355,7 +2429,7 @@ def _normalize_x_pred(self, x_pred, x_min, x_max): new_x_pred[:,i] = (x_pred[:,i] - x_min[i]) / (x_max[i] - x_min[i]) return new_x_pred - def _cartesian_product(self,x,y): + def _cartesian_product_euclid(self,x,y): """ Input x,y have to be 2d numpy arrays The return is the cartesian product of the two sets @@ -2366,6 +2440,17 @@ def _cartesian_product(self,x,y): res.append(np.append(x[j],y[i])) return np.array(res) + def _cartesian_product_noneuclid(self,x,y): + """ + Input x,y have to be 2d numpy arrays + The return is the cartesian product of the two sets + """ + res = [] + for i in range(len(y)): + for j in range(len(x)): + res.append([x[j],y[i]]) + return res + #################################################################################### #################################################################################### #######################VALIDATION################################################### @@ -2474,12 +2559,13 @@ def make_1d_x_pred(self, b, res = 100): #pragma: no cover #################################################################################### def wendland_anisotropic_gp2Scale_cpu(x1,x2, hps, obj): distance_matrix = np.zeros((len(x1),len(x2))) - for i in range(len(x1[0])): distance_matrix += abs(np.subtract.outer(x1[:,i],x2[:,i])/hps[1+i])**2 + for i in range(len(x1[0])): distance_matrix += (np.subtract.outer(x1[:,i],x2[:,i])/hps[1+i])**2 d = np.sqrt(distance_matrix) d[d > 1.] = 1. kernel = hps[0] * (1.-d)**8 * (35.*d**3 + 25.*d**2 + 8.*d + 1.) return kernel + def wendland_anisotropic_gp2Scale_gpu(x1,x2, hps, obj): # pragma: no cover import torch cuda_device = torch.device("cuda:0") @@ -2496,6 +2582,5 @@ def _get_distance_matrix_gpu(x1,x2,device,hps): # pragma: no cover import torch d = torch.zeros((len(x1),len(x2))).to(device, dtype = torch.float32) for i in range(x1.shape[1]): - d += ((x1[:,i].reshape(-1, 1) - x2[:,i])/hps[i])**2 + d += ((x1[:,i].reshape(-1, 1) - x2[:,i])/hps[1+i])**2 return torch.sqrt(d) - diff --git a/fvgp/gp2Scale.py b/fvgp/gp2Scale.py index ac0dec0..efbfd52 100755 --- a/fvgp/gp2Scale.py +++ b/fvgp/gp2Scale.py @@ -2,52 +2,22 @@ ############################################################ ############################################################ -import dask.distributed as distributed -from .sparse_matrix import gp2ScaleSparseMatrix +import itertools import time +from functools import partial +import dask.distributed as distributed +import numpy as np import scipy.sparse as sparse -class gp2Scale(): - """ - This class allows the user to scale GPs up to millions of datapoints. There is full high-performance-computing - support through DASK. - - Parameters - ---------- - x_data : np.ndarray - The point positions. Shape (V x D), where D is the `input_space_dim`. - batch_size : int, optional - The covariance is divided into batches of the defined size for distributed computing. Default = 10000 - LUtimeout : int, optional (future release) - Controls the timeout for the LU decomposition. - gp_kernel_function : Callable, optional - A function that calculates the covariance between datapoints. It accepts as input x1 (a V x D array of positions), - x2 (a U x D array of positions), hyperparameters (a 1-D array of length D+1 for the default kernel), and a - `gpcam.gp_optimizer.GPOptimizer` instance. The default is a stationary anisotropic kernel - (`fvgp.gp.GP.default_kernel`). - gp_mean_function : Callable, optional - A function that evaluates the prior mean at an input position. It accepts as input - an array of positions (of size V x D), hyperparameters (a 1-D array of length D+1 for the default kernel) - and a `gpcam.gp_optimizer.GPOptimizer` instance. The return value is a 1-D array of length V. If None is provided, - `fvgp.gp.GP.default_mean_function` is used. - a finite difference scheme is used. - covariance_dask_client : dask.distributed.client, optional - The client used for the covariance computation. If none is provided a local client will be used. - info : bool, optional - Controls the output of the algorithm for tests. The default is False - args : user defined, optional - These optional arguments will be available as attribute in kernel and mean function definitions. - - """ +class gp2Scale(): def __init__( self, x_data, - batch_size = 10000, - gp_kernel_function = None, - LUtimeout = 100, - covariance_dask_client = None, - info = False, - ): + batch_size=10000, + gp_kernel_function=None, + covariance_dask_client=None, + info=False, + ): """ The constructor for the gp class. type help(GP) for more information about attributes, methods and their parameters @@ -56,19 +26,19 @@ def __init__( self.point_number = len(x_data) self.num_batches = self.point_number // self.batch_size self.info = info - self.LUtimeout = LUtimeout self.x_data = x_data self.kernel = gp_kernel_function + self.number_of_workers = len(covariance_dask_client.scheduler_info()['workers']) - covariance_dask_client, self.compute_worker_set, self.actor_worker = self._init_dask_client(covariance_dask_client) - ###initiate actor that is a future contain the covariance and methods - self.SparsePriorCovariance = covariance_dask_client.submit(gp2ScaleSparseMatrix,self.point_number, actor=True, workers=self.actor_worker).result() - #self.covariance_dask_client = covariance_dask_client + worker_info = list(covariance_dask_client.scheduler_info()["workers"].keys()) + if not worker_info: raise Exception("No workers available") + self.compute_workers = list(worker_info) - scatter_data = {"x_data":self.x_data.copy()} ##data that can be scattered - self.scatter_future = covariance_dask_client.scatter(scatter_data,workers = self.compute_worker_set) ##scatter the data to compute workers, not the actor + scatter_data = self.x_data ##data that can be scattered + self.scatter_future = covariance_dask_client.scatter( + scatter_data,workers = self.compute_workers ,broadcast = False) ##scatter the data to compute workers, TEST if broadcast is better ################################################################################## ################################################################################## @@ -83,122 +53,60 @@ def _total_number_of_batches(self): Db = float(self.num_batches) return 0.5 * Db * (Db + 1.) - def compute_covariance(self, hyperparameters,client): - """computes the covariance matrix from the kernel on HPC in sparse format""" - ###initialize futures - futures = [] ### a list of futures - actor_futures = [] ###futures of the actor - finished_futures = set() ### a set of finished futures - ###get workers - compute_workers = set(self.compute_worker_set) - #idle_workers = set(compute_workers) - idle_workers = compute_workers.copy() - ###future_worker_assignments - self.future_worker_assignments = {} - ###scatter data - start_time = time.time() - count = 0 - num_batches_i = len(self.x_data) // self.batch_size - num_batches_j = len(self.x_data) // self.batch_size - last_batch_size_i = len(self.x_data) % self.batch_size - last_batch_size_j = len(self.x_data) % self.batch_size - if last_batch_size_i != 0: num_batches_i += 1 - if last_batch_size_j != 0: num_batches_j += 1 - - for i in range(num_batches_i): - beg_i = i * self.batch_size - end_i = min((i+1) * self.batch_size, self.point_number) - batch1 = self.x_data[beg_i: end_i] - for j in range(i,num_batches_j): - beg_j = j * self.batch_size - end_j = min((j+1) * self.batch_size, self.point_number) - batch2 = self.x_data[beg_j : end_j] - ##make workers available that are not actively computing - while not idle_workers: - idle_workers, futures, finished_futures = self.free_workers(futures, finished_futures) - if not idle_workers: time.sleep(0.1) - - ####collect finished workers but only if actor is not busy, otherwise do it later - if len(finished_futures) >= 2000: - actor_futures.append(self.SparsePriorCovariance.get_future_results(finished_futures.copy(), info = self.info)) - #actor_futures = self.clean_actor_futures(actor_futures) - finished_futures = set() - - #get idle worker and submit work - current_worker = self.get_idle_worker(idle_workers) - data = {"scattered_data": self.scatter_future,"hps": hyperparameters, "kernel" :self.kernel, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j)} - futures.append(client.submit(kernel_function, data, workers = current_worker)) - self.assign_future_2_worker(futures[-1].key,current_worker) - if self.info and ( i % 10) == 0.: - print(" submitted batch. i:", beg_i,end_i," j:",beg_j,end_j, "to worker ",current_worker, " Future: ", futures[-1].key,flush = True) - print(" current time stamp: ", time.time() - start_time," percent finished: ",int(100. * float(count)/self._total_number_of_batches()),flush = True) - print("",flush = True) - count += 1 - - if self.info: - print("All tasks submitted after ",time.time() - start_time,flush = True) - print("number of computed batches: ", count) - - actor_futures.append(self.SparsePriorCovariance.get_future_results(finished_futures.union(futures), info = self.info)) - client.gather(actor_futures) - - ######### - if self.info: - print("total prior covariance compute time: ", time.time() - start_time, "Non-zero count: ", self.SparsePriorCovariance.get_result().result().nzz,flush = True) - print("Sparsity: ",self.SparsePriorCovariance.get_result().result().nnz/float(self.point_number)**2,flush = True) - - - def free_workers(self, futures, finished_futures): - free_workers = set() - remaining_futures = [] - for future in futures: - if future.status == "cancelled": print("WARNING: cancelled futures encountered!",flush = True) - if future.status == "finished": - finished_futures.add(future) - free_workers.add(self.future_worker_assignments[future.key]) - del self.future_worker_assignments[future.key] - else: remaining_futures.append(future) - del futures - return free_workers, remaining_futures, finished_futures - - #def clean_actor_futures(self,actor_futures): - # for entry in reversed(actor_futures): - # if entry.status == "finished": - # actor_futures.remove(entry) - # return actor_futures - + @staticmethod + def ranges(N, nb): + """ splits a range(N) into nb chunks defined by chunk_start, chunk_end """ + step = N / nb + return [(round(step * i), round(step * (i + 1))) for i in range(nb)] - def assign_future_2_worker(self, future_key, worker_address): - self.future_worker_assignments[future_key] = worker_address - - def get_idle_worker(self,idle_workers): - return idle_workers.pop() - - def calculate_sparse_noise_covariance(self,vector): + def compute_covariance(self, hyperparameters, client, batched=False): + """computes the covariance matrix from the kernel on HPC in sparse format""" + NUM_RANGES = self.num_batches + ranges = self.ranges(len(self.x_data), NUM_RANGES) # the chunk ranges, as (start, end) tuples + ranges_ij = list( + itertools.product(ranges, ranges)) # all i/j ranges as ((i_start, i_end), (j_start, j_end)) pairs of tuples + ranges_ij = [range_ij for range_ij in ranges_ij if range_ij[0][0] <= range_ij[1][0]] # filter lower diagonal + if batched: + # number of batches shouldn't be less than the number of workers + batches = min(len(client.cluster.workers), len(ranges_ij)) + # split ranges_ij into roughly equal batches + ranges_ij = [ranges_ij[i::batches] for i in range(batches)] + + kernel_caller = kernel_function_batched + else: + kernel_caller = kernel_function + + ##scattering + results = list(map(self.harvest_result, + distributed.as_completed(client.map( + partial(kernel_caller, + hyperparameters=hyperparameters, + kernel=self.kernel), + ranges_ij, + [self.scatter_future] * len(ranges_ij)), + with_results=True))) + + #reshape the result set into COO components + data, i_s, j_s = map(np.hstack, zip(*results)) + # mirror across diagonal + diagonal_mask = i_s != j_s + data, i_s, j_s = np.hstack([data, data[diagonal_mask]]), \ + np.hstack([i_s, j_s[diagonal_mask]]), \ + np.hstack([j_s, i_s[diagonal_mask]]) + return sparse.coo_matrix((data, (i_s, j_s))) + + + @staticmethod + def harvest_result(future_result): + future, result = future_result + future.release() + return result + + def calculate_sparse_noise_covariance(self, vector): diag = sparse.eye(len(vector), format="coo") diag.setdiag(vector) return diag - ################################################################################## - ################################################################################## - ################################################################################## - ################################################################################## - ######################DASK######################################################## - ################################################################################## - ################################################################################## - ################################################################################## - ################################################################################## - def _init_dask_client(self,dask_client): - if dask_client is None: dask_client = distributed.Client() - worker_info = list(dask_client.scheduler_info()["workers"].keys()) - if not worker_info: raise Exception("No workers available") - actor_worker = worker_info[0] - compute_worker_set = set(worker_info[1:]) - print("We have ", len(compute_worker_set)," compute workers ready to go.") - print("Actor on", actor_worker) - print("Scheduler Address: ", dask_client.scheduler_info()["address"]) - return dask_client, compute_worker_set,actor_worker - ######################################################################### ######################################################################### @@ -214,19 +122,20 @@ def _init_dask_client(self,dask_client): ######################################################################### ######################################################################### ######################################################################### -class gpm2Scale(gp2Scale): # pragma: no cover - def __init__(self,input_space_dim, +class gpm2Scale(gp2Scale): # pragma: no cover + def __init__(self, input_space_dim, output_space_dim, - x_data, ##data in the original input space + x_data, ##data in the original input space init_hyperparameters, batch_size, - variances = None, - init_y_data = None, #initial latent space positions - gp_kernel_function = None, - gp_mean_function = None, covariance_dask_client = None, - info = False): - - if input_space_dim != len(x_data[0]): raise ValueError("input space dimensions are not in agreement with the point positions given") + variances=None, + init_y_data=None, # initial latent space positions + gp_kernel_function=None, + gp_mean_function=None, covariance_dask_client=None, + info=False): + + if input_space_dim != len(x_data[0]): raise ValueError( + "input space dimensions are not in agreement with the point positions given") self.input_dim = input_space_dim self.output_dim = output_space_dim self.x_data = x_data @@ -241,59 +150,67 @@ def __init__(self,input_space_dim, ########################################## if variances is None: self.variances = np.ones((self.x_data.shape[0])) * \ - abs(np.mean(self.x_data) / 100.0) + abs(np.mean(self.x_data) / 100.0) print("CAUTION: you have not provided data variances in fvGP,") print("they will be set to 1 percent of the data values!") - if len(self.variances[self.variances < 0]) > 0: raise Exception("Negative measurement variances communicated to fvgp.") + if len(self.variances[self.variances < 0]) > 0: raise Exception( + "Negative measurement variances communicated to fvgp.") ########################################## #######define kernel and mean function#### ########################################## - if gp_kernel_function == "robust": self.kernel = sparse_stat_kernel_robust - elif callable(gp_kernel_function): self.kernel = gp_kernel_function - else: raise Exception("A kernel callable has to be provided!") + if gp_kernel_function == "robust": + self.kernel = sparse_stat_kernel_robust + elif callable(gp_kernel_function): + self.kernel = gp_kernel_function + else: + raise Exception("A kernel callable has to be provided!") self.gp_mean_function = gp_mean_function - if callable(gp_mean_function): self.mean_function = gp_mean_function - else: self.mean_function = self.default_mean_function + if callable(gp_mean_function): + self.mean_function = gp_mean_function + else: + self.mean_function = self.default_mean_function ########################################## #######prepare hyper parameters########### ########################################## self.hyperparameters = np.array(init_hyperparameters) ########################################## - #compute the prior######################## + # compute the prior######################## ########################################## - covariance_dask_client, self.compute_worker_set, self.actor_worker = self._init_dask_client(covariance_dask_client) + covariance_dask_client, self.compute_worker_set, self.actor_worker = self._init_dask_client( + covariance_dask_client) ###initiate actor that is a future contain the covariance and methods - self.SparsePriorCovariance = covariance_dask_client.submit(gp2ScaleSparseMatrix,self.point_number, actor=True, workers=self.actor_worker).result()# Create Actor + self.SparsePriorCovariance = covariance_dask_client.submit(gp2ScaleSparseMatrix, self.point_number, actor=True, + workers=self.actor_worker).result() # Create Actor self.covariance_dask_client = covariance_dask_client - scatter_data = {"x_data":self.x_data} ##data that can be scattered - self.scatter_future = covariance_dask_client.scatter(scatter_data,workers = self.compute_worker_set) ##scatter the data + scatter_data = {"x_data": self.x_data} ##data that can be scattered + self.scatter_future = covariance_dask_client.scatter(scatter_data, + workers=self.compute_worker_set) ##scatter the data self.st = time.time() - self.compute_covariance(self.y_data,self.y_data,self.hyperparameters,variances,covariance_dask_client) + self.compute_covariance(self.y_data, self.y_data, self.hyperparameters, variances, covariance_dask_client) if self.info: sp = self.SparsePriorCovariance.get_result().result() print("gp2Scale successfully initiated, here is some info about the prior covariance matrix:") print("non zero elements: ", sp.nnz) - print("Size in GBits: ", sp.data.nbytes/1e9) - print("Sparsity: ",sp.nnz/float(self.point_number)**2) + print("Size in GBits: ", sp.data.nbytes / 1e9) + print("Sparsity: ", sp.nnz / float(self.point_number) ** 2) + if self.point_number <= 5000: print("Here is an image:") plt.imshow(sp.toarray()) plt.show() - - def total_number_of_batches(self): Db = float(self.num_batches) return 0.5 * Db * (Db + 1.) - def compute_covariance(self, hyperparameters, variances,client): + def compute_covariance(self, hyperparameters, variances, client): """computes the covariance matrix from the kernel on HPC in sparse format""" ###initialize futures - futures = [] ### a list of futures + futures = [] ### a list of futures actor_futures = [] finished_futures = set() ###get workers @@ -313,12 +230,12 @@ def compute_covariance(self, hyperparameters, variances,client): for i in range(num_batches_i): beg_i = i * self.batch_size - end_i = min((i+1) * self.batch_size, self.point_number) + end_i = min((i + 1) * self.batch_size, self.point_number) batch1 = self.x_data[beg_i: end_i] - for j in range(i,num_batches_j): + for j in range(i, num_batches_j): beg_j = j * self.batch_size - end_j = min((j+1) * self.batch_size, self.point_number) - batch2 = self.x_data[beg_j : end_j] + end_j = min((j + 1) * self.batch_size, self.point_number) + batch2 = self.x_data[beg_j: end_j] ##make workers available that are not actively computing while not idle_workers: idle_workers, futures, finished_futures = self.free_workers(futures, finished_futures) @@ -329,38 +246,36 @@ def compute_covariance(self, hyperparameters, variances,client): actor_futures.append(self.SparsePriorCovariance.get_future_results(set(finished_futures))) finished_futures = set() - #get idle worker and submit work + # get idle worker and submit work current_worker = self.get_idle_worker(idle_workers) - data = {"scattered_data": self.scatter_future,"hps": hyperparameters, "kernel" :self.kernel, "range_i": (beg_i,end_i), "range_j": (beg_j,end_j), "mode": "prior","gpu": 0} - futures.append(client.submit(kernel_function, data, workers = current_worker)) - self.assign_future_2_worker(futures[-1].key,current_worker) + data = {"scattered_data": self.scatter_future, "hps": hyperparameters, "kernel": self.kernel, + "range_i": (beg_i, end_i), "range_j": (beg_j, end_j), "mode": "prior", "gpu": 0} + futures.append(client.submit(kernel_function, data, workers=current_worker)) + self.assign_future_2_worker(futures[-1].key, current_worker) if self.info: - print(" submitted batch. i:", beg_i,end_i," j:",beg_j,end_j, "to worker ",current_worker, " Future: ", futures[-1].key) - print(" current time stamp: ", time.time() - start_time," percent finished: ",float(count)/self.total_number_of_batches()) + print(" submitted batch. i:", beg_i, end_i, " j:", beg_j, end_j, "to worker ", current_worker, + " Future: ", futures[-1].key) + print(" current time stamp: ", time.time() - start_time, " percent finished: ", + float(count) / self.total_number_of_batches()) print("") count += 1 if self.info: - print("All tasks submitted after ",time.time() - start_time,flush = True) + print("All tasks submitted after ", time.time() - start_time, flush=True) print("number of computed batches: ", count) actor_futures.append(self.SparsePriorCovariance.get_future_results(finished_futures.union(futures))) - #actor_futures[-1].result() + # actor_futures[-1].result() client.gather(actor_futures) - actor_futures.append(self.SparsePriorCovariance.add_to_diag(variances)) ##add to diag on actor + actor_futures.append(self.SparsePriorCovariance.add_to_diag(variances)) ##add to diag on actor actor_futures[-1].result() - #clean up - #del futures - #del actor_futures - #del finished_futures - #del scatter_future + # clean up + # del futures + # del actor_futures + # del finished_futures + # del scatter_future ######### - if self.info: - print("total prior covariance compute time: ", time.time() - start_time, "Non-zero count: ", self.SparsePriorCovariance.get_result().result().nnz) - print("Sparsity: ",self.SparsePriorCovariance.get_result().result().nnz/float(self.point_number)**2) - - def free_workers(self, futures, finished_futures): free_workers = set() remaining_futures = [] @@ -370,14 +285,16 @@ def free_workers(self, futures, finished_futures): finished_futures.add(future) free_workers.add(self.future_worker_assignments[future.key]) del self.future_worker_assignments[future.key] - else: remaining_futures.append(future) + else: + remaining_futures.append(future) return free_workers, remaining_futures, finished_futures def assign_future_2_worker(self, future_key, worker_address): self.future_worker_assignments[future_key] = worker_address - def get_idle_worker(self,idle_workers): + def get_idle_worker(self, idle_workers): return idle_workers.pop() + ################################################################################## ################################################################################## ################################################################################## @@ -387,27 +304,26 @@ def get_idle_worker(self,idle_workers): ################################################################################## ################################################################################## ################################################################################## - def _init_dask_client(self,dask_client): + def _init_dask_client(self, dask_client): if dask_client is None: dask_client = distributed.Client() worker_info = list(dask_client.scheduler_info()["workers"].keys()) if not worker_info: raise Exception("No workers available") actor_worker = worker_info[0] compute_worker_set = set(worker_info[1:]) - print("We have ", len(compute_worker_set)," compute workers ready to go.") + print("We have ", len(compute_worker_set), " compute workers ready to go.") print("Actor on", actor_worker) print("Scheduler Address: ", dask_client.scheduler_info()["address"]) - return dask_client, compute_worker_set,actor_worker + return dask_client, compute_worker_set, actor_worker - def train(self, - hyperparameter_bounds, - method = "global", - init_hyperparameters = None, - max_iter = 120, - ): + hyperparameter_bounds, + method="global", + init_hyperparameters=None, + max_iter=120, + ): """ This function finds the maximum of the log_likelihood and therefore trains the fvGP (synchronously). - This can be done on a remote cluster/computer by specifying the method to be be 'hgdl' and + This can be done on a remote cluster/computer by specifying the method to be be 'hgdl' and providing a dask client inputs: @@ -430,7 +346,7 @@ def train(self, ############################################ if init_hyperparameters is None: init_hyperparameters = np.array(self.hyperparameters) - print("fvGP training started with ",len(self.x_data)," data points") + print("fvGP training started with ", len(self.x_data), " data points") ###################### #####TRAINING######### ###################### @@ -439,32 +355,30 @@ def train(self, np.array(hyperparameter_bounds), max_iter, method - ) - #print("computing the prior") - #self.compute_prior_fvGP_pdf(self.covariance_dask_client) + ) + # print("computing the prior") + # self.compute_prior_fvGP_pdf(self.covariance_dask_client) self.y_data = self.hyperparameters[0:-2].reshape(self.point_number, self.output_dim) np.save("output_points", self.y_data) ###################### ###################### ###################### - - def optimize_log_likelihood(self, - starting_hps, - hp_bounds, - max_iter, - method - ): + starting_hps, + hp_bounds, + max_iter, + method + ): - #start_log_likelihood = self.log_likelihood(starting_hps, recompute_xK = False) + # start_log_likelihood = self.log_likelihood(starting_hps, recompute_xK = False) if method == "mcmc": print("MCMC started in fvGP") - print('bounds are',hp_bounds) - res = mcmc(self.log_likelihood,hp_bounds,max_iter = max_iter, x0 = starting_hps) + print('bounds are', hp_bounds) + res = mcmc(self.log_likelihood, hp_bounds, max_iter=max_iter, x0=starting_hps) hyperparameters = np.array(res["x"]) self.mcmc_info = res - print("MCMC has found solution: ", hyperparameters, "with log marginal_likelihood ",res["f(x)"]) + print("MCMC has found solution: ", hyperparameters, "with log marginal_likelihood ", res["f(x)"]) elif method == "global": res = differential_evolution(self.neg_log_likelihood, hp_bounds) self.hyperparameters = hyperparameters @@ -483,15 +397,14 @@ def log_likelihood(self, y): y1 = y2 = x[0:-2].reshape(self.point_number, self.output_dim) self.SparsePriorCovariance.reset_prior().result() hps = x[-2:] - self.compute_covariance(y1,y2,hps, self.variances,client) + self.compute_covariance(y1, y2, hps, self.variances, client) logdet = self.SparsePriorCovariance.logdet().result() n = len(y) x = self.x_data traceKXX = self.SparsePriorCovariance.traceKXX(x).result() - res = -(0.5 * traceKXX) - (dim * 0.5 * logdet) - (0.5 * dim *n * np.log(2.0*np.pi)) + res = -(0.5 * traceKXX) - (dim * 0.5 * logdet) - (0.5 * dim * n * np.log(2.0 * np.pi)) return res - def neg_log_likelihood(self, y): """ computes the marginal log-likelihood @@ -505,12 +418,12 @@ def neg_log_likelihood(self, y): y1 = y2 = y[0:-2].reshape(self.point_number, self.output_dim) self.SparsePriorCovariance.reset_prior().result() hps = y[-2:] - self.compute_covariance(y1,y2,hps, self.variances,client) + self.compute_covariance(y1, y2, hps, self.variances, client) logdet = self.SparsePriorCovariance.logdet().result() n = len(y) x = self.x_data traceKXX = self.SparsePriorCovariance.traceKXX(x).result() - res = (0.5 * traceKXX) + (dim * 0.5 * logdet) + (0.5 * dim *n * np.log(2.0*np.pi)) + res = (0.5 * traceKXX) + (dim * 0.5 * logdet) + (0.5 * dim * n * np.log(2.0 * np.pi)) print("res") print("") return res @@ -521,16 +434,44 @@ def neg_log_likelihood(self, y): ######################################################################### ######################################################################### ######################################################################### -def kernel_function(data): # pragma: no cover +def kernel_function(range_ij, scatter_future, hyperparameters, kernel): + """ + Essentially, parameters other than range_ij are static across calls. range_ij defines the region of the covariance matrix being calculated. + Rather than return a sparse array in local coordinates, we can return the COO components in global coordinates. + """ st = time.time() - hps= data["hps"] - kernel = data["kernel"] - worker = distributed.get_worker() - x1 = data["scattered_data"]["x_data"][data["range_i"][0]:data["range_i"][1]] - x2 = data["scattered_data"]["x_data"][data["range_j"][0]:data["range_j"][1]] - range1 = data["range_i"] - range2 = data["range_j"] - k = kernel(x1,x2,hps, None) + hps = hyperparameters + range_i, range_j = range_ij + x1 = scatter_future[range_i[0]:range_i[1]] + x2 = scatter_future[range_j[0]:range_j[1]] + + k = kernel(x1, x2, hps, None) k_sparse = sparse.coo_matrix(k) - #print(time.time() - st, flush = True) - return k_sparse, (data["range_i"][0],data["range_j"][0]), time.time() - st, worker.address + + #print("kernel compute time: ", time.time() - st, flush = True) + data, rows, cols = k_sparse.data, k_sparse.row + range_i[0], k_sparse.col + range_j[0] + + # mask lower triangular values when current chunk spans diagonal + if range_i[0] == range_j[0]: + mask = [row <= col for (row, col) in zip(rows, cols)] + return data[mask], rows[mask], cols[mask] + else: + return data, rows, cols + + +def kernel_function_batched(range_ijs, scatter_future, hyperparameters, kernel): + """ + Essentially, parameters other than range_ij are static across calls. range_ij defines the region of the covariance matrix being calculated. + Rather than return a sparse array in local coordinates, we can return the COO components in global coordinates. + """ + data = [] + rows = [] + cols = [] + + for range_ij in range_ijs: + data_ij, rows_ij, cols_ij = kernel_function(range_ij, scatter_future, hyperparameters, kernel) + data.append(data_ij) + rows.append(rows_ij) + cols.append(cols_ij) + + return np.hstack(data), np.hstack(rows), np.hstack(cols) diff --git a/fvgp/mcmc.py b/fvgp/mcmc.py index 9adace6..ae32505 100755 --- a/fvgp/mcmc.py +++ b/fvgp/mcmc.py @@ -66,6 +66,7 @@ def mcmc(likelihood_fn, bounds, x0 = None, n_updates = 10000, start_time = time.time() n_updates = max(n_updates,2) + if np.ndim(x0) != 1: raise Exception("x0 is not a vector in MCMC") if x0 is None: x0 = np.ones((len(bounds))) if prior_args is None: prior_args = bounds if prior_fn is None: prior_fn = prior_func @@ -75,23 +76,23 @@ def mcmc(likelihood_fn, bounds, x0 = None, n_updates = 10000, ctime = [] if type(x0).__module__!='numpy' or isinstance(x0, np.float64): x0 = np.array(x0) - p = x0.size + p = len(x0) invalid = False # If the supplied proposal covariance matrix is either not given or invalid, # just use the identity. if np.any(np.isnan(prop_Sigma)) or prop_Sigma.size != p**2: - prop_Sigma = np.eye(p) - prop_C = np.eye(p) + axis_std = np.linalg.norm(bounds, axis = 1)/10. + prop_Sigma = np.diag(axis_std**2) + prop_C = np.linalg.cholesky(prop_Sigma) invalid = True else: try: # Initialize prop_C prop_C = np.linalg.cholesky(prop_Sigma) - except np.linalg.LinAlgError: + except np.linalg.LinAlgError: prop_Sigma = np.eye(p) prop_C = np.eye(p) invalid = True - #if invalid: print("Invalid or missing proposal covariance matrix. Using identity.\n") # Initialize sigma_m to the rule of thumb sigma_m = 2.4**2/p r_hat = 0 @@ -108,7 +109,7 @@ def mcmc(likelihood_fn, bounds, x0 = None, n_updates = 10000, prop_Sigma_trace[0,:,:] = prop_Sigma # Initialize Metropolis theta = x0 - likelihood = likelihood_fn(theta) + likelihood = likelihood_fn(hyperparameters = theta) prior = prior_fn(theta, prior_args) ######################################################### # Begin main loop @@ -116,7 +117,7 @@ def mcmc(likelihood_fn, bounds, x0 = None, n_updates = 10000, theta_star= theta + sigma_m * np.random.standard_normal(p) @ prop_C prior_star = prior_fn(theta_star, prior_args) if prior_star != -np.inf: - likelihood_star = likelihood_fn(theta_star) + likelihood_star = likelihood_fn(hyperparameters = theta_star) if np.isnan(likelihood_star): likelihood_star = -np.inf metr_ratio = np.exp(prior_star + likelihood_star - prior - likelihood) @@ -146,8 +147,6 @@ def mcmc(likelihood_fn, bounds, x0 = None, n_updates = 10000, check_chol_cont = False except np.linalg.LinAlgError: prop_Sigma = prop_Sigma + eps*np.eye(p) - #print("Oops. Proposal covariance matrix is now:\n") - #print(prop_Sigma) # Update the trace objects trace[:, i] = theta x = np.asarray(trace.T) @@ -159,7 +158,7 @@ def mcmc(likelihood_fn, bounds, x0 = None, n_updates = 10000, prop_Sigma_trace[i,:,:] = prop_Sigma # Echo every 100 iterations if info: - if (i % 100) == 0: print("Finished "+str(i)+ " out of " + str(n_updates), " iterations.\n") + if (i % 100) == 0: print("Finished "+str(i)+ " out of " + str(n_updates), " iterations. f(x)=",likelihood) if len(x)>201 and np.linalg.norm(np.mean(x[-100:],axis = 0)-np.mean(x[-200:-100],axis = 0)) < 0.01 * np.linalg.norm(np.mean(x[-100:],axis = 0)): break # End main loop ######################################################### @@ -176,10 +175,9 @@ def mcmc(likelihood_fn, bounds, x0 = None, n_updates = 10000, return {"f(x)": f[arg_max], "x":x[arg_max], "F":f, - "distribution": x[int(len(x) - (len(x)/10)):], + "stripped distribution": x[int(len(x) - (len(x)/10)):], "full distribution": x, "distribution mean": np.mean(x[int(len(x) - (len(x)/10)):],axis = 0), - "distribution var": np.var(x[int(len(x) - (len(x)/10)):],axis = 0), - "compute time" : ctime, - "meta": res} + "distribution var": np.var(x[int(len(x) - (len(x)/10)):],axis = 0), + "compute time" : ctime} diff --git a/fvgp/sparse_matrix.py b/fvgp/sparse_matrixOBS.py similarity index 93% rename from fvgp/sparse_matrix.py rename to fvgp/sparse_matrixOBS.py index dc04829..1d7363a 100755 --- a/fvgp/sparse_matrix.py +++ b/fvgp/sparse_matrixOBS.py @@ -46,11 +46,11 @@ def get_future_results(self, futures, info = False): res = [] for future in futures: SparseCov_sub, ranges, ketime, worker = future.result() - if info: print("Collected Future ", future.key, " has finished its work in", ketime," seconds. time stamp: ",time.time() - self.st, flush = True) + #if info: print("Collected Future ", future.key, " has finished its work in", ketime," seconds. time stamp: ",time.time() - self.st, flush = True) res.append((SparseCov_sub,ranges[0],ranges[1])) self.insert_many(res) - if info: print(" Size of the current covariance matrix: ", self.K.count_nonzero(), flush = True) + #if info: print(" Size of the current covariance matrix: ", self.K.count_nonzero(), flush = True) del futures return 0 diff --git a/requirements.txt b/requirements.txt index 6cb26dd..a1b812d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ scipy == 1.11.2 numpy == 1.23.5 dask >= 2021.6.2 distributed >= 2021.6.2 -hgdl == 2.1.2 +hgdl == 2.1.3 notebook loguru imate diff --git a/tests/test_fvgp.py b/tests/test_fvgp.py index 9bec85d..f49c234 100755 --- a/tests/test_fvgp.py +++ b/tests/test_fvgp.py @@ -18,6 +18,7 @@ import datetime import sys from dask.distributed import performance_report +from distributed.utils_test import gen_cluster, client, loop, cluster_fixture, loop_in_thread, cleanup @@ -32,157 +33,156 @@ x_pred = np.random.rand(10, input_dim) -class Test_fvGP(unittest.TestCase): - """Tests for `fvgp` package.""" - def test_single_task_init_basic(self): - my_gp1 = GP(input_dim, x_data, y_data, init_hyperparameters = np.array([1, 1, 1, 1, 1, 1]), compute_device = 'cpu') - my_gp1 = GP(input_dim, x_data, y_data) - my_gp1 = GP(input_dim, x_data, y_data, init_hyperparameters = np.array([1, 1, 1, 1, 1, 1]), normalize_y = True) - my_gp1 = GP(input_dim, x_data, y_data, init_hyperparameters = np.array([1, 1, 1, 1, 1, 1]), store_inv = False) - my_gp1 = GP(input_dim, x_data, y_data, init_hyperparameters = np.array([1, 1, 1, 1, 1, 1]), args = {'a':2.}) - my_gp1 = GP(input_dim, x_data, y_data, sparse_mode = True) - - - my_gp1.update_gp_data(x_data, y_data, noise_variances = np.ones((y_data.shape)) * 0.01) - my_gp1.update_gp_data(x_data, y_data) - res = my_gp1.posterior_mean(x_pred) - res = my_gp1.posterior_mean_grad(x_pred,direction=0) - res = my_gp1.posterior_mean_grad(x_pred) - res = my_gp1.posterior_covariance(x_pred) - res = my_gp1.posterior_covariance_grad(x_pred,direction=0) - res = my_gp1.gp_entropy(x_pred) - res = my_gp1.shannon_information_gain(x_pred) - res = my_gp1.squared_exponential_kernel(1,1) - res = my_gp1.squared_exponential_kernel_robust(1,1) - res = my_gp1.exponential_kernel(1,1) - res = my_gp1.exponential_kernel_robust(1,1) - res = my_gp1.matern_kernel_diff1(1,1) - res = my_gp1.matern_kernel_diff1_robust(1,1) - res = my_gp1.matern_kernel_diff2(1,1) - res = my_gp1.matern_kernel_diff2_robust(1,1) - res = my_gp1.sparse_kernel(1,1) - res = my_gp1.periodic_kernel(1,1,1) - res = my_gp1.default_kernel(x_data,x_data,np.array([1,1,1,1,1,1]),my_gp1) - - def test_single_task_init_advanced(self): - my_gp2 = GP(input_dim, x_data,y_data,np.array([1, 1, 1, 1, 1, 1]),noise_variances=np.zeros(y_data.shape) + 0.01, - compute_device="cpu", normalize_y = True, store_inv = True, ram_economy = True) - - def test_train_basic(self): - my_gp1 = GP(input_dim, x_data, y_data, np.array([1, 1, 1, 1, 1, 1])) - my_gp1.train(np.array([[0.01,1],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), - method = "local", pop_size = 10, tolerance = 0.001,max_iter = 2) - my_gp1.train(np.array([[0.01,1],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), - method = "global", pop_size = 10, tolerance = 0.001,max_iter = 2) - my_gp1.train(np.array([[0.01,1],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), - method = "hgdl", pop_size = 10, tolerance = 0.001,max_iter = 2) - my_gp1.train(np.array([[0.01,1],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), - method = "mcmc", pop_size = 10, tolerance = 0.001,max_iter = 2) - - res = my_gp1.posterior_mean(np.random.rand(len(x_data),len(x_data[0]))) - res = my_gp1.posterior_mean_grad(np.random.rand(10,len(x_data[0]))) - res = my_gp1.posterior_covariance(np.random.rand(10,len(x_data[0]))) - res = my_gp1.posterior_covariance_grad(np.random.rand(10,len(x_data[0]))) - res = my_gp1.joint_gp_prior(np.random.rand(10,len(x_data[0]))) - res = my_gp1.joint_gp_prior_grad(np.random.rand(10,len(x_data[0])),0) - res = my_gp1.gp_entropy(np.random.rand(10,len(x_data[0]))) - res = my_gp1.gp_entropy_grad(np.random.rand(10,len(x_data[0])),0) - - A = np.random.rand(10,10) - B = A.T @ A - res = my_gp1.entropy(B) - res = my_gp1.gp_kl_div(np.random.rand(10,len(x_data[0])), np.random.rand(10), B) - res = my_gp1.gp_kl_div_grad(np.random.rand(10,len(x_data[0])), np.random.rand(10), B,0) - res = my_gp1.shannon_information_gain(np.random.rand(10,len(x_data[0]))) - res = my_gp1.shannon_information_gain_vec(np.random.rand(10,len(x_data[0]))) - res = my_gp1.posterior_probability(np.random.rand(10,len(x_data[0])), np.random.rand(10), B) - res = my_gp1.posterior_probability_grad(np.random.rand(10,len(x_data[0])), np.random.rand(10), B, direction = 0) - - res = my_gp1.squared_exponential_kernel(1.,1.) - res = my_gp1.squared_exponential_kernel_robust(1.,1.) - res = my_gp1.exponential_kernel(1.,1.) - res = my_gp1.exponential_kernel_robust(1.,1.) - distance = 1. - length = 1.5 - phi = 2. - l = 2. - w = 5. - p = 1. - radius = 3. - - res = my_gp1.matern_kernel_diff1(distance, length) - res = my_gp1.matern_kernel_diff1_robust(distance, phi) - res = my_gp1.matern_kernel_diff2(distance, length) - - res = my_gp1.matern_kernel_diff2_robust(distance, phi) - res = my_gp1.sparse_kernel(distance, radius) - res = my_gp1.periodic_kernel(distance, length, p) - - res = my_gp1.linear_kernel(2.,2.2, 1.,1.,1.) - res = my_gp1.dot_product_kernel(np.random.rand(2),np.random.rand(2),1.,np.array([[1.,0.],[0.,2.]])) - res = my_gp1.polynomial_kernel(np.random.rand(2),np.random.rand(2), 2) - res = my_gp1.default_kernel(x_data,x_data,np.ones((6)),my_gp1) - res = my_gp1.non_stat_kernel(x_data,x_data,np.random.rand(10,5),np.random.rand(10),0.5) - res = my_gp1.non_stat_kernel_gradient(x_data,x_data,np.random.rand(10,5),np.random.rand(10),0.5) - res = my_gp1.wendland_anisotropic(x_data,x_data,np.ones((6)), my_gp1) - - def test_train_hgdl(self): - my_gp2 = GP(input_dim, x_data,y_data,np.array([1, 1, 1, 1, 1, 1]),noise_variances=np.zeros(y_data.shape) + 0.01, - compute_device="cpu", normalize_y = True, store_inv = True, ram_economy = True) - - - my_gp2.train(np.array([[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), - method = "hgdl", tolerance = 0.001, max_iter = 2) - - - def test_train_hgdl_async(self): - my_gp2 = GP(input_dim, x_data,y_data,np.array([1, 1, 1, 1, 1, 1]),noise_variances=np.zeros(y_data.shape) + 0.01, - compute_device="cpu", normalize_y = True, store_inv = True, ram_economy = True) - - opt_obj = my_gp2.train_async(np.array([[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), - max_iter = 5000) - - time.sleep(3) - my_gp2.update_hyperparameters(opt_obj) - my_gp2.stop_training(opt_obj) - my_gp2.kill_training(opt_obj) - my_gp2.set_hyperparameters(np.array([1., 1., 1., 1., 1., 1.])) - my_gp2.get_hyperparameters() - my_gp2.get_prior_pdf() - my_gp2.test_log_likelihood_gradient(np.array([1., 1., 1., 1., 1., 1.])) - - - def test_multi_task(self): - def mkernel(x1,x2,hps,obj): - d = obj._get_distance_matrix(x1,x2) - return hps[0] * obj.matern_kernel_diff1(d,hps[1]) - y_data = np.zeros((N,2)) - y_data[:,0] = np.sin(np.linalg.norm(x_data, axis=1)) - y_data[:,1] = np.cos(np.linalg.norm(x_data, axis=1)) - - my_fvgp = fvGP(input_dim,1,2, x_data, y_data, np.array([1, 1]), gp_kernel_function=mkernel) - my_fvgp.update_gp_data(x_data, y_data) - my_fvgp.train(np.array([[0.01,1],[0.01,10]]), - method = "global", pop_size = 10, tolerance = 0.001, max_iter = 2) - - - def test_gp2Scale(self): - client = Client() - input_dim = 1 - N = 2000 - x_data = np.random.rand(N,input_dim) - y_data = np.sin(np.linalg.norm(x_data,axis = 1) * 5.0) - hps_n = 2 - - hps_bounds = np.array([[0.1,10.], ##signal var of stat kernel - [0.001,0.02] ##length scale for stat kernel - ]) - - init_hps = np.random.uniform(size = len(hps_bounds), low = hps_bounds[:,0], high = hps_bounds[:,1]) - my_gp2S = GP(1, x_data,y_data,init_hps, gp2Scale = True, gp2Scale_batch_size= 1000) - - my_gp2S.train(hps_bounds, max_iter = 2, init_hyperparameters = init_hps) - - x_pred = np.linspace(0,1,1000) - mean1 = my_gp2S.posterior_mean(x_pred.reshape(-1,1))["f(x)"] - var1 = my_gp2S.posterior_covariance(x_pred.reshape(-1,1))["v(x)"] +"""Tests for `fvgp` package.""" +def test_single_task_init_basic(): + my_gp1 = GP(input_dim, x_data, y_data, init_hyperparameters = np.array([1, 1, 1, 1, 1, 1]), compute_device = 'cpu') + my_gp1 = GP(input_dim, x_data, y_data) + my_gp1 = GP(input_dim, x_data, y_data, init_hyperparameters = np.array([1, 1, 1, 1, 1, 1]), normalize_y = True) + my_gp1 = GP(input_dim, x_data, y_data, init_hyperparameters = np.array([1, 1, 1, 1, 1, 1]), store_inv = False) + my_gp1 = GP(input_dim, x_data, y_data, init_hyperparameters = np.array([1, 1, 1, 1, 1, 1]), args = {'a':2.}) + my_gp1 = GP(input_dim, x_data, y_data, sparse_mode = True) + + + my_gp1.update_gp_data(x_data, y_data, noise_variances = np.ones((y_data.shape)) * 0.01) + my_gp1.update_gp_data(x_data, y_data) + res = my_gp1.posterior_mean(x_pred) + res = my_gp1.posterior_mean_grad(x_pred,direction=0) + res = my_gp1.posterior_mean_grad(x_pred) + res = my_gp1.posterior_covariance(x_pred) + res = my_gp1.posterior_covariance_grad(x_pred,direction=0) + res = my_gp1.gp_entropy(x_pred) + res = my_gp1.shannon_information_gain(x_pred) + res = my_gp1.squared_exponential_kernel(1,1) + res = my_gp1.squared_exponential_kernel_robust(1,1) + res = my_gp1.exponential_kernel(1,1) + res = my_gp1.exponential_kernel_robust(1,1) + res = my_gp1.matern_kernel_diff1(1,1) + res = my_gp1.matern_kernel_diff1_robust(1,1) + res = my_gp1.matern_kernel_diff2(1,1) + res = my_gp1.matern_kernel_diff2_robust(1,1) + res = my_gp1.sparse_kernel(1,1) + res = my_gp1.periodic_kernel(1,1,1) + res = my_gp1.default_kernel(x_data,x_data,np.array([1.,1.,1.,1.,1.,1.]),my_gp1) + +def test_single_task_init_advanced(): + my_gp2 = GP(input_dim, x_data,y_data,np.array([1, 1, 1, 1, 1, 1]),noise_variances=np.zeros(y_data.shape) + 0.01, + compute_device="cpu", normalize_y = True, store_inv = True, ram_economy = True) + +def test_train_basic(): + my_gp1 = GP(input_dim, x_data, y_data, np.array([1., 1., 1., 1., 1., 1.])) + my_gp1.train(np.array([[0.01,1],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), + method = "local", pop_size = 10, tolerance = 0.001,max_iter = 2) + my_gp1.train(np.array([[0.01,1],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), + method = "global", pop_size = 10, tolerance = 0.001,max_iter = 2) + my_gp1.train(np.array([[0.01,1],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), + method = "hgdl", pop_size = 10, tolerance = 0.001,max_iter = 2) + my_gp1.train(np.array([[0.01,1],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), + method = "mcmc", pop_size = 10, tolerance = 0.001,max_iter = 2) + + res = my_gp1.posterior_mean(np.random.rand(len(x_data),len(x_data[0]))) + res = my_gp1.posterior_mean_grad(np.random.rand(10,len(x_data[0]))) + res = my_gp1.posterior_covariance(np.random.rand(10,len(x_data[0]))) + res = my_gp1.posterior_covariance_grad(np.random.rand(10,len(x_data[0]))) + res = my_gp1.joint_gp_prior(np.random.rand(10,len(x_data[0]))) + res = my_gp1.joint_gp_prior_grad(np.random.rand(10,len(x_data[0])),0) + res = my_gp1.gp_entropy(np.random.rand(10,len(x_data[0]))) + res = my_gp1.gp_entropy_grad(np.random.rand(10,len(x_data[0])),0) + + A = np.random.rand(10,10) + B = A.T @ A + res = my_gp1.entropy(B) + res = my_gp1.gp_kl_div(np.random.rand(10,len(x_data[0])), np.random.rand(10), B) + res = my_gp1.gp_kl_div_grad(np.random.rand(10,len(x_data[0])), np.random.rand(10), B,0) + res = my_gp1.shannon_information_gain(np.random.rand(10,len(x_data[0]))) + res = my_gp1.shannon_information_gain_vec(np.random.rand(10,len(x_data[0]))) + res = my_gp1.posterior_probability(np.random.rand(10,len(x_data[0])), np.random.rand(10), B) + res = my_gp1.posterior_probability_grad(np.random.rand(10,len(x_data[0])), np.random.rand(10), B, direction = 0) + + res = my_gp1.squared_exponential_kernel(1.,1.) + res = my_gp1.squared_exponential_kernel_robust(1.,1.) + res = my_gp1.exponential_kernel(1.,1.) + res = my_gp1.exponential_kernel_robust(1.,1.) + distance = 1. + length = 1.5 + phi = 2. + l = 2. + w = 5. + p = 1. + radius = 3. + + res = my_gp1.matern_kernel_diff1(distance, length) + res = my_gp1.matern_kernel_diff1_robust(distance, phi) + res = my_gp1.matern_kernel_diff2(distance, length) + + res = my_gp1.matern_kernel_diff2_robust(distance, phi) + res = my_gp1.sparse_kernel(distance, radius) + res = my_gp1.periodic_kernel(distance, length, p) + + res = my_gp1.linear_kernel(2.,2.2, 1.,1.,1.) + res = my_gp1.dot_product_kernel(np.random.rand(2),np.random.rand(2),1.,np.array([[1.,0.],[0.,2.]])) + res = my_gp1.polynomial_kernel(np.random.rand(2),np.random.rand(2), 2) + res = my_gp1.default_kernel(x_data,x_data,np.ones((6)),my_gp1) + res = my_gp1.non_stat_kernel(x_data,x_data,np.random.rand(10,5),np.random.rand(10),0.5) + res = my_gp1.non_stat_kernel_gradient(x_data,x_data,np.random.rand(10,5),np.random.rand(10),0.5) + res = my_gp1.wendland_anisotropic(x_data,x_data,np.ones((6)), my_gp1) + +def test_train_hgdl(): + my_gp2 = GP(input_dim, x_data,y_data,init_hyperparameters = np.array([1., 1., 1., 1., 1., 1.]), noise_variances=np.zeros(y_data.shape) + 0.01, + compute_device="cpu", normalize_y = True, store_inv = True, ram_economy = True) + + + my_gp2.train(np.array([[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), + method = "hgdl", tolerance = 0.001, max_iter = 2) + + +def test_train_hgdl_async(): + my_gp2 = GP(input_dim, x_data,y_data,init_hyperparameters = np.array([1., 1., 1., 1., 1., 1.]),noise_variances=np.zeros(y_data.shape) + 0.01, + compute_device="cpu", normalize_y = True, store_inv = True, ram_economy = True) + + opt_obj = my_gp2.train_async(np.array([[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10],[0.01,10]]), + max_iter = 5000) + + time.sleep(3) + my_gp2.update_hyperparameters(opt_obj) + my_gp2.stop_training(opt_obj) + my_gp2.kill_training(opt_obj) + my_gp2.set_hyperparameters(np.array([1., 1., 1., 1., 1., 1.])) + my_gp2.get_hyperparameters() + my_gp2.get_prior_pdf() + my_gp2.test_log_likelihood_gradient(np.array([1., 1., 1., 1., 1., 1.])) + + +def test_multi_task(): + def mkernel(x1,x2,hps,obj): + d = obj._get_distance_matrix(x1,x2) + return hps[0] * obj.matern_kernel_diff1(d,hps[1]) + y_data = np.zeros((N,2)) + y_data[:,0] = np.sin(np.linalg.norm(x_data, axis=1)) + y_data[:,1] = np.cos(np.linalg.norm(x_data, axis=1)) + + my_fvgp = fvGP(input_dim,1,2, x_data, y_data, np.array([1, 1]), gp_kernel_function=mkernel) + my_fvgp.update_gp_data(x_data, y_data) + my_fvgp.train(np.array([[0.01,1],[0.01,10]]), + method = "global", pop_size = 10, tolerance = 0.001, max_iter = 2) + my_fvgp.posterior_mean(np.random.rand(2,5), x_out = np.zeros((1,1)))["f(x)"] + + +def test_gp2Scale(client): + input_dim = 1 + N = 2000 + x_data = np.random.rand(N,input_dim) + y_data = np.sin(np.linalg.norm(x_data,axis = 1) * 5.0) + hps_n = 2 + + hps_bounds = np.array([[0.1,10.], ##signal var of stat kernel + [0.001,0.02] ##length scale for stat kernel + ]) + + init_hps = np.random.uniform(size = len(hps_bounds), low = hps_bounds[:,0], high = hps_bounds[:,1]) + my_gp2S = GP(1, x_data,y_data,init_hps, gp2Scale = True, gp2Scale_batch_size= 1000, gp2Scale_dask_client=client) + + my_gp2S.train(hps_bounds, max_iter = 2, init_hyperparameters = init_hps) + + x_pred = np.linspace(0,1,1000) + mean1 = my_gp2S.posterior_mean(x_pred.reshape(-1,1))["f(x)"] + var1 = my_gp2S.posterior_covariance(x_pred.reshape(-1,1))["v(x)"]