"""Implementations for acquiring locations of new evidence for Bayesian optimization."""
import logging
import numpy as np
import scipy.linalg as sl
import scipy.stats as ss
import elfi.methods.mcmc as mcmc
from elfi.methods.bo.utils import CostFunction, minimize
from elfi.methods.utils import resolve_sigmas
logger = logging.getLogger(__name__)
class AcquisitionBase:
"""All acquisition functions are assumed to fulfill this interface.
Gaussian noise ~N(0, self.noise_var) is added to the acquired points. By default,
noise_var=0. You can define a different variance for the separate dimensions.
"""
def __init__(self,
model,
prior=None,
n_inits=10,
max_opt_iters=1000,
noise_var=None,
exploration_rate=10,
seed=None,
constraints=None):
"""Initialize AcquisitionBase.
Parameters
----------
model : an object with attributes
input_dim : int
bounds : tuple of length 'input_dim' of tuples (min, max)
and methods
evaluate(x) : function that returns model (mean, var, std)
prior : scipy-like distribution, optional
By default uniform distribution within model bounds.
n_inits : int, optional
Number of initialization points in internal optimization.
max_opt_iters : int, optional
Max iterations to optimize when finding the next point.
noise_var : float or np.array, optional
Acquisition noise variance for adding noise to the points near the optimized
location. If array, must be 1d specifying the variance for different dimensions.
Default: no added noise.
exploration_rate : float, optional
Exploration rate of the acquisition function (if supported)
seed : int, optional
Seed for getting consistent acquisition results. Used in getting random
starting locations in acquisition function optimization.
constraints : {Constraint, dict} or List of {Constraint, dict}, optional
Additional model constraints.
"""
self.model = model
self.prior = prior
self.n_inits = int(n_inits)
self.max_opt_iters = int(max_opt_iters)
self.constraints = constraints
if noise_var is not None:
self._check_noise_var(noise_var)
self.noise_var = self._transform_noise_var(noise_var)
else:
self.noise_var = noise_var
self.exploration_rate = exploration_rate
self.random_state = np.random if seed is None else np.random.RandomState(seed)
self.seed = 0 if seed is None else seed
def _check_noise_var(self, noise_var):
if isinstance(noise_var, dict):
if not set(noise_var) == set(self.model.parameter_names):
raise ValueError("Acquisition noise dictionary should contain all parameters.")
if not all(isinstance(x, (int, float)) for x in noise_var.values()):
raise ValueError("Acquisition noise dictionary values "
"should all be int or float.")
if any([x < 0 for x in noise_var.values()]):
raise ValueError("Acquisition noises values should all be "
"non-negative int or float.")
elif isinstance(noise_var, (int, float)):
if noise_var < 0:
raise ValueError("Acquisition noise should be non-negative int or float.")
else:
raise ValueError("Either acquisition noise is a float or "
"it is a dictionary of floats defining "
"variance for each parameter dimension.")
def _transform_noise_var(self, noise_var):
if isinstance(noise_var, (float, int)):
return noise_var
# return a sorted list of noise variances in the same order than
# parameter_names of the model
if isinstance(noise_var, dict):
return list(map(noise_var.get, self.model.parameter_names))
def evaluate(self, x, t=None):
"""Evaluate the acquisition function at 'x'.
Parameters
----------
x : numpy.array
t : int
current iteration (starting from 0)
"""
raise NotImplementedError
def evaluate_gradient(self, x, t=None):
"""Evaluate the gradient of acquisition function at 'x'.
Parameters
----------
x : numpy.array
t : int
Current iteration (starting from 0).
"""
raise NotImplementedError
def acquire(self, n, t=None):
"""Return the next batch of acquisition points.
Gaussian noise ~N(0, self.noise_var) is added to the acquired points.
Parameters
----------
n : int
Number of acquisition points to return.
t : int
Current acq_batch_index (starting from 0).
Returns
-------
x : np.ndarray
The shape is (n, input_dim)
"""
logger.debug('Acquiring the next batch of %d values', n)
# Optimize the current minimum
def obj(x):
return self.evaluate(x, t)
def grad_obj(x):
return self.evaluate_gradient(x, t)
xhat, _ = minimize(
obj,
self.model.bounds,
method='L-BFGS-B' if self.constraints is None else 'SLSQP',
constraints=self.constraints,
grad=grad_obj,
prior=self.prior,
n_start_points=self.n_inits,
maxiter=self.max_opt_iters,
random_state=self.random_state)
# Create n copies of the minimum
x = np.tile(xhat, (n, 1))
# Add noise for more efficient fitting of GP
x = self._add_noise(x)
return x
def _add_noise(self, x):
# Add noise for more efficient fitting of GP
if self.noise_var is not None:
noise_var = np.asanyarray(self.noise_var)
if noise_var.ndim == 0:
noise_var = np.tile(noise_var, self.model.input_dim)
for i in range(self.model.input_dim):
std = np.sqrt(noise_var[i])
if std == 0:
continue
xi = x[:, i]
a = (self.model.bounds[i][0] - xi) / std
b = (self.model.bounds[i][1] - xi) / std
x[:, i] = ss.truncnorm.rvs(
a, b, loc=xi, scale=std, size=len(x), random_state=self.random_state)
return x
[docs]class LCBSC(AcquisitionBase):
r"""Lower Confidence Bound Selection Criterion.
Srinivas et al. call this GP-LCB.
LCBSC uses the parameter delta which is here equivalent to 1/exploration_rate.
Parameter delta should be in (0, 1) for the theoretical results to hold. The
theoretical upper bound for total regret in Srinivas et al. has a probability greater
or equal to 1 - delta, so values of delta very close to 1 or over it do not make much
sense in that respect.
Delta is roughly the exploitation tendency of the acquisition function.
References
----------
N. Srinivas, A. Krause, S. M. Kakade, and M. Seeger. Gaussian
process optimization in the bandit setting: No regret and experimental design. In
Proc. International Conference on Machine Learning (ICML), 2010
E. Brochu, V.M. Cora, and N. de Freitas. A tutorial on Bayesian optimization of expensive
cost functions, with application to active user modeling and hierarchical reinforcement
learning. arXiv:1012.2599, 2010.
Notes
-----
The formula presented in Brochu (pp. 15) seems to be from Srinivas et al. Theorem 2.
However, instead of having t**(d/2 + 2) in \beta_t, it seems that the correct form
would be t**(2d + 2).
"""
def __init__(self, *args, delta=None, additive_cost=None, **kwargs):
"""Initialize LCBSC.
Parameters
----------
delta: float, optional
In between (0, 1). Default is 1/exploration_rate. If given, overrides the
exploration_rate.
additive_cost: CostFunction, optional
Cost function output is added to the base acquisition value.
"""
if delta is not None:
if delta <= 0 or delta >= 1:
logger.warning('Parameter delta should be in the interval (0,1)')
kwargs['exploration_rate'] = 1 / delta
super(LCBSC, self).__init__(*args, **kwargs)
self.name = 'lcbsc'
self.label_fn = 'Confidence Bound'
if additive_cost is not None and not isinstance(additive_cost, CostFunction):
raise TypeError("Additive cost must be type CostFunction.")
self.additive_cost = additive_cost
@property
def delta(self):
"""Return the inverse of exploration rate."""
return 1 / self.exploration_rate
def _beta(self, t):
# Start from 0
t += 1
d = self.model.input_dim
return 2 * np.log(t**(2 * d + 2) * np.pi**2 / (3 * self.delta))
[docs] def evaluate(self, x, t=None):
"""Evaluate the Lower confidence bound selection criterion.
Parameters
----------
x: np.ndarray
t: int, optional
Current iteration (starting from 0).
Returns
-------
np.ndarray
"""
mean, var = self.model.predict(x, noiseless=True)
value = mean - np.sqrt(self._beta(t) * var)
if self.additive_cost is not None:
value += self.additive_cost.evaluate(x)
return value
[docs] def evaluate_gradient(self, x, t=None):
"""Evaluate the gradient of the lower confidence bound selection criterion.
Parameters
----------
x: np.ndarray
t: int, optional
Current iteration (starting from 0).
Returns
-------
np.ndarray
"""
mean, var = self.model.predict(x, noiseless=True)
grad_mean, grad_var = self.model.predictive_gradients(x)
value = grad_mean - 0.5 * grad_var * np.sqrt(self._beta(t) / var)
if self.additive_cost is not None:
value += self.additive_cost.evaluate_gradient(x)
return value
[docs]class MaxVar(AcquisitionBase):
r"""The maximum variance acquisition method.
The next evaluation point is acquired in the maximiser of the variance of
the unnormalised approximate posterior.
.. math:: \theta_{t+1} = \arg \max \text{Var}(p(\theta) \cdot p_a(\theta)),
where the unnormalised likelihood :math:`p_a` is defined
using the CDF of normal distribution, :math:`\Phi`, as follows:
.. math:: p_a(\theta) = \Phi((\epsilon - \mu_{1:t}(\theta)) /
\sqrt{v_{1:t}(\theta) + \sigma^2_n}),
where \epsilon is the ABC threshold, :math:`\mu_{1:t}` and :math:`v_{1:t}` are
determined by the Gaussian process, :math:`\sigma^2_n` is the noise.
References
----------
Järvenpää et al. (2019). Efficient Acquisition Rules for Model-Based
Approximate Bayesian Computation. Bayesian Analysis 14(2):595-622, 2019
https://projecteuclid.org/euclid.ba/1537258134
"""
def __init__(self, model, prior, quantile_eps=.01, **opts):
"""Initialise MaxVar.
Parameters
----------
model : elfi.GPyRegression
Gaussian process model used to calculate the unnormalised approximate likelihood.
prior : scipy-like distribution
Prior distribution.
quantile_eps : int, optional
Quantile of the observed discrepancies used in setting the ABC threshold.
"""
super(MaxVar, self).__init__(model, prior=prior, **opts)
self.name = 'max_var'
self.label_fn = 'Variance of the Unnormalised Approximate Posterior'
self.quantile_eps = quantile_eps
# The ABC threshold is initialised to a pre-set value as the gp is not yet fit.
self.eps = .1
[docs] def acquire(self, n, t=None):
"""Acquire a batch of acquisition points.
Parameters
----------
n : int
Number of acquisitions.
t : int, optional
Current iteration, (unused).
Returns
-------
array_like
Coordinates of the yielded acquisition points.
"""
logger.debug('Acquiring the next batch of %d values', n)
gp = self.model
# Updating the ABC threshold.
self.eps = np.percentile(gp.Y, self.quantile_eps * 100)
def _negate_eval(theta):
return -self.evaluate(theta)
def _negate_eval_grad(theta):
return -self.evaluate_gradient(theta)
# Obtaining the location where the variance is maximised.
theta_max, _ = minimize(_negate_eval,
gp.bounds,
grad=_negate_eval_grad,
prior=self.prior,
n_start_points=self.n_inits,
maxiter=self.max_opt_iters,
random_state=self.random_state)
# Using the same location for all points in theta batch.
batch_theta = np.tile(theta_max, (n, 1))
return batch_theta
[docs] def evaluate(self, theta_new, t=None):
"""Evaluate the acquisition function at the location theta_new.
Parameters
----------
theta_new : array_like
Evaluation coordinates.
t : int, optional
Current iteration, (unused).
Returns
-------
array_like
Variance of the approximate posterior.
"""
mean, var = self.model.predict(theta_new, noiseless=True)
sigma2_n = self.model.noise
# Using the cdf of Skewnorm to avoid explicit Owen's T computation.
a = np.sqrt(sigma2_n) / np.sqrt(sigma2_n + 2. * var) # Skewness.
scale = np.sqrt(sigma2_n + var)
phi_skew = ss.skewnorm.cdf(self.eps, a, loc=mean, scale=scale)
phi_norm = ss.norm.cdf(self.eps, loc=mean, scale=scale)
var_p_a = phi_skew - phi_norm**2
val_prior = self.prior.pdf(theta_new).ravel()[:, np.newaxis]
var_approx_posterior = val_prior**2 * var_p_a
return var_approx_posterior
[docs] def evaluate_gradient(self, theta_new, t=None):
"""Evaluate the acquisition function's gradient at the location theta_new.
Parameters
----------
theta_new : array_like
Evaluation coordinates.
t : int, optional
Current iteration, (unused).
Returns
-------
array_like
Gradient of the variance of the approximate posterior
"""
phi = ss.norm.cdf
mean, var = self.model.predict(theta_new, noiseless=True)
grad_mean, grad_var = self.model.predictive_gradients(theta_new)
sigma2_n = self.model.noise
scale = np.sqrt(sigma2_n + var)
a = (self.eps - mean) / scale
b = np.sqrt(sigma2_n) / np.sqrt(sigma2_n + 2 * var)
grad_a = (-1. / scale) * grad_mean - \
((self.eps - mean) / (2. * (sigma2_n + var)**(1.5))) * grad_var
grad_b = (-np.sqrt(sigma2_n) / (sigma2_n + 2 * var)**(1.5)) * grad_var
_phi_a = phi(a)
int_1 = _phi_a - _phi_a**2
int_2 = phi(self.eps, loc=mean, scale=scale) \
- ss.skewnorm.cdf(self.eps, b, loc=mean, scale=scale)
grad_int_1 = (1. - 2 * _phi_a) * \
(np.exp(-.5 * (a**2)) / np.sqrt(2. * np.pi)) * grad_a
grad_int_2 = (1. / np.pi) * \
(((np.exp(-.5 * (a**2) * (1. + b**2))) / (1. + b**2)) * grad_b
+ (np.sqrt(np.pi / 2.) * np.exp(-.5 * (a**2)) * (1. - 2. * phi(a * b)) * grad_a))
# Obtaining the gradient prior by applying the following rule:
# (log f(x))' = f'(x)/f(x) => f'(x) = (log f(x))' * f(x)
term_prior = self.prior.pdf(theta_new).ravel()[:, np.newaxis]
grad_prior_log = self.prior.gradient_logpdf(theta_new)
term_grad_prior = term_prior * grad_prior_log
gradient = 2. * term_prior * (int_1 - int_2) * term_grad_prior + \
term_prior**2 * (grad_int_1 - grad_int_2)
return gradient
[docs]class RandMaxVar(MaxVar):
r"""The randomised maximum variance acquisition method.
The next evaluation point is sampled from the density corresponding to the
variance of the unnormalised approximate posterior (The MaxVar acquisition function).
.. math:: \theta_{t+1} \thicksim q(\theta),
where :math:`q(\theta) \propto \text{Var}(p(\theta) \cdot p_a(\theta))` and
the unnormalised likelihood :math:`p_a` is defined
using the CDF of normal distribution, :math:`\Phi`, as follows:
.. math:: p_a(\theta) = \Phi((\epsilon - \mu_{1:t}(\theta)) /
\sqrt{v_{1:t}(\theta) + \sigma^2_n} ),
where :math:`\epsilon` is the ABC threshold, :math:`\mu_{1:t}` and :math:`v_{1:t}` are
determined by the Gaussian process, :math:`\sigma^2_n` is the noise.
References
----------
Järvenpää et al. (2019). Efficient Acquisition Rules for Model-Based
Approximate Bayesian Computation. Bayesian Analysis 14(2):595-622, 2019
https://projecteuclid.org/euclid.ba/1537258134
"""
def __init__(self, model, prior, quantile_eps=.01, sampler='nuts', n_samples=50, warmup=None,
limit_faulty_init=1000, init_from_prior=False, sigma_proposals=None, **opts):
"""Initialise RandMaxVar.
Parameters
----------
model : elfi.GPyRegression
Gaussian process model used to calculate the unnormalised approximate likelihood.
prior : scipy-like distribution
Prior distribution.
quantile_eps : int, optional
Quantile of the observed discrepancies used in setting the ABC threshold.
sampler : string, optional
Name of the sampler (options: metropolis, nuts).
n_samples : int, optional
Length of the sampler's chain for obtaining the acquisitions.
warmup : int, optional
Number of samples discarded as warmup. Defaults to n_samples/2.
limit_faulty_init : int, optional
Limit for the iterations used to obtain the sampler's initial points.
init_from_prior : bool, optional
Controls whether the sampler's initial points are sampled from the prior or
a uniform distribution within model bounds. Defaults to model bounds.
sigma_proposals : dict, optional
Standard deviations for Gaussian proposals of each parameter for Metropolis
Markov Chain sampler. Defaults to 1/10 of surrogate model bound lengths.
"""
super(RandMaxVar, self).__init__(model, prior, quantile_eps, **opts)
self.name = 'rand_max_var'
self.name_sampler = sampler
self._n_samples = n_samples
self._warmup = warmup or n_samples // 2
self._limit_faulty_init = limit_faulty_init
self._init_from_prior = init_from_prior
if self.name_sampler == 'metropolis':
self._sigma_proposals = resolve_sigmas(self.model.parameter_names,
sigma_proposals,
self.model.bounds)
[docs] def acquire(self, n, t=None):
"""Acquire a batch of acquisition points.
Parameters
----------
n : int
Number of acquisitions.
t : int, optional
Current iteration, (unused).
Returns
-------
array_like
Coordinates of the yielded acquisition points.
"""
if n > self._n_samples:
raise ValueError(("The number of acquisitions ({0}) has to be lower than the number "
"of the samples ({1}).").format(n, self._n_samples - self._warmup))
logger.debug('Acquiring the next batch of %d values', n)
gp = self.model
# Updating the ABC threshold.
self.eps = np.percentile(gp.Y, self.quantile_eps * 100)
def _evaluate_gradient_logpdf(theta):
denominator = self.evaluate(theta)
if denominator == 0:
return -np.inf
pt_eval = self.evaluate_gradient(theta) / denominator
return pt_eval.ravel()
def _evaluate_logpdf(theta):
val_pdf = self.evaluate(theta)
if val_pdf == 0:
return -np.inf
return np.log(val_pdf)
batch_theta = np.zeros(shape=len(gp.bounds))
# Obtaining the RandMaxVar acquisition.
for i in range(self._limit_faulty_init + 1):
if i == self._limit_faulty_init:
raise SystemExit("Unable to find a suitable initial point.")
# Proposing the initial point.
if self._init_from_prior:
theta_init = self.prior.rvs(random_state=self.random_state)
for idx_param, bound in enumerate(gp.bounds):
theta_init[idx_param] = np.clip(theta_init[idx_param], bound[0], bound[1])
else:
theta_init = np.zeros(shape=len(gp.bounds))
for idx_param, bound in enumerate(gp.bounds):
theta_init[idx_param] = self.random_state.uniform(bound[0], bound[1])
# Refusing to accept a faulty initial point.
if np.isinf(_evaluate_logpdf(theta_init)):
continue
# Sampling the acquisition using the chosen sampler.
if self.name_sampler == 'metropolis':
samples = mcmc.metropolis(self._n_samples,
theta_init,
_evaluate_logpdf,
sigma_proposals=self._sigma_proposals,
seed=self.seed)
elif self.name_sampler == 'nuts':
samples = mcmc.nuts(self._n_samples,
theta_init,
_evaluate_logpdf,
_evaluate_gradient_logpdf,
seed=self.seed)
else:
raise ValueError(
"Incompatible sampler. Please check the options in the documentation.")
if n > 1:
# Remove warmup samples and return n random points
samples = samples[self._warmup:]
batch_theta = self.random_state.permutation(samples)[:n]
else:
# Return the last point
batch_theta = samples[-1:]
break
return batch_theta
[docs]class ExpIntVar(MaxVar):
r"""The Expected Integrated Variance (ExpIntVar) acquisition method.
Essentially, we define a loss function that measures the overall uncertainty
in the unnormalised ABC posterior over the parameter space.
The value of the loss function depends on the next simulation and thus
the next evaluation location :math:`\theta^*` is chosen to minimise the expected loss.
.. math:: \theta_{t+1} = arg min_{\theta^* \in \Theta} L_{1:t}(\theta^*),
where :math:`\Theta` is the parameter space, and :math:`L` is the expected loss
function approximated as follows:
.. math:: L_{1:t}(\theta^*) \approx 2 * \sum_{i=1}^s (\omega^i \cdot p^2(\theta^i)
\cdot w_{1:t+1}(\theta^i, \theta^*),
where :math:`\omega^i` is an importance weight,
:math:`p^2(\theta^i)` is the prior squared, and
:math:`w_{1:t+1}(\theta^i, \theta^*)` is the expected variance of the unnormalised ABC
posterior at \theta^i after running the simulation model with parameter :math:`\theta^*`
References
----------
Järvenpää et al. (2019). Efficient Acquisition Rules for Model-Based
Approximate Bayesian Computation. Bayesian Analysis 14(2):595-622, 2019
https://projecteuclid.org/euclid.ba/1537258134
"""
def __init__(self, model, prior, quantile_eps=.01, integration='grid', d_grid=.2,
n_samples_imp=100, iter_imp=2, sampler='nuts', n_samples=2000,
sigma_proposals=None, **opts):
"""Initialise ExpIntVar.
Parameters
----------
model : elfi.GPyRegression
Gaussian process model used to calculate the approximate unnormalised likelihood.
prior : scipy-like distribution
Prior distribution.
quantile_eps : int, optional
Quantile of the observed discrepancies used in setting the discrepancy threshold.
integration : str, optional
Integration method. Options:
- grid (points are taken uniformly): more accurate yet
computationally expensive in high dimensions;
- importance (points are taken based on the importance weight): less accurate though
applicable in high dimensions.
d_grid : float, optional
Grid tightness.
n_samples_imp : int, optional
Number of importance samples.
iter_imp : int, optional
Gap between acquisition iterations in performing importance sampling.
sampler : string, optional
Sampler for generating random numbers from the proposal distribution for IS.
(Options: metropolis, nuts.)
n_samples : int, optional
Chain length for the sampler that generates the random numbers
from the proposal distribution for IS.
sigma_proposals : dict, optional
Standard deviations for Gaussian proposals of each parameter for Metropolis
Markov Chain sampler. Defaults to 1/10 of surrogate model bound lengths.
"""
super(ExpIntVar, self).__init__(model, prior, quantile_eps, **opts)
self.name = 'exp_int_var'
self.label_fn = 'Expected Loss'
self._integration = integration
self._n_samples_imp = n_samples_imp
self._iter_imp = iter_imp
if self._integration == 'importance':
self.density_is = RandMaxVar(model=self.model,
prior=self.prior,
n_inits=self.n_inits,
seed=self.seed,
quantile_eps=self.quantile_eps,
sampler=sampler,
n_samples=n_samples,
sigma_proposals=sigma_proposals)
elif self._integration == 'grid':
grid_param = [slice(b[0], b[1], d_grid) for b in self.model.bounds]
self.points_int = np.mgrid[grid_param].reshape(len(self.model.bounds), -1).T
[docs] def acquire(self, n, t):
"""Acquire a batch of acquisition points.
Parameters
----------
n : int
Number of acquisitions.
t : int
Current iteration.
Returns
-------
array_like
Coordinates of the yielded acquisition points.
"""
logger.debug('Acquiring the next batch of %d values', n)
gp = self.model
self.sigma2_n = gp.noise
# Updating the discrepancy threshold.
self.eps = np.percentile(gp.Y, self.quantile_eps * 100)
# Performing the importance sampling step every self._iter_imp iterations.
if self._integration == 'importance' and t % self._iter_imp == 0:
self.points_int = self.density_is.acquire(self._n_samples_imp)
# Obtaining the omegas_int and priors_int terms to be used in the evaluate function.
self.mean_int, self.var_int = gp.predict(self.points_int, noiseless=True)
self.priors_int = (self.prior.pdf(self.points_int)**2)[np.newaxis, :]
if self._integration == 'importance' and t % self._iter_imp == 0:
omegas_int_unnormalised = (1 / MaxVar.evaluate(self, self.points_int)).T
self.omegas_int = omegas_int_unnormalised / \
np.sum(omegas_int_unnormalised, axis=1)[:, np.newaxis]
elif self._integration == 'grid':
self.omegas_int = np.empty(len(self.points_int))
self.omegas_int.fill(1 / len(self.points_int))
# Initialising the attributes used in the evaluate function.
self.thetas_old = np.array(gp.X)
self._K = gp._gp.kern.K
self.K = self._K(self.thetas_old, self.thetas_old) + \
self.sigma2_n * np.identity(self.thetas_old.shape[0])
self.k_int_old = self._K(self.points_int, self.thetas_old).T
self.phi_int = ss.norm.cdf(self.eps, loc=self.mean_int.T,
scale=np.sqrt(self.sigma2_n + self.var_int.T))
# Obtaining the location where the expected loss is minimised.
# Note: The gradient is computed numerically as GPy currently does not
# directly provide the derivative computations used in Järvenpää et al., 2017.
theta_min, _ = minimize(self.evaluate,
gp.bounds,
grad=None,
prior=self.prior,
n_start_points=self.n_inits,
maxiter=self.max_opt_iters,
random_state=self.random_state)
# Using the same location for all points in the batch.
batch_theta = np.tile(theta_min, (n, 1))
return batch_theta
[docs] def evaluate(self, theta_new, t=None):
"""Evaluate the acquisition function at the location theta_new.
Parameters
----------
theta_new : array_like
Evaluation coordinates.
t : int, optional
Current iteration, (unused).
Returns
-------
array_like
Expected loss's term dependent on theta_new.
"""
gp = self.model
n_dim = self.points_int.shape
# Alter the shape of theta_new.
if n_dim != 1 and theta_new.ndim == 1:
theta_new = theta_new[np.newaxis, :]
elif n_dim == 1 and theta_new.ndim == 1:
theta_new = theta_new[:, np.newaxis]
# Calculate the integrand term w.
# Note: w's second term (given in Järvenpää et al., 2017) is dismissed
# because it is constant with respect to theta_new.
_, var_new = gp.predict(theta_new, noiseless=True)
k_old_new = self._K(self.thetas_old, theta_new)
k_int_new = self._K(self.points_int, theta_new).T
# Using the Cholesky factorisation to avoid computing matrix inverse.
term_chol = sl.cho_solve(sl.cho_factor(self.K), k_old_new)
cov_int = k_int_new - np.dot(self.k_int_old.T, term_chol).T
delta_var_int = cov_int**2 / (self.sigma2_n + var_new)
a = np.sqrt((self.sigma2_n + self.var_int.T - delta_var_int)
/ (self.sigma2_n + self.var_int.T + delta_var_int))
# Using the skewnorm's cdf to substitute the Owen's T function.
phi_skew_imp = ss.skewnorm.cdf(self.eps, a, loc=self.mean_int.T,
scale=np.sqrt(self.sigma2_n + self.var_int.T))
w = ((self.phi_int - phi_skew_imp) / 2)
loss_theta_new = 2 * np.sum(self.omegas_int * self.priors_int * w, axis=1)
loss_theta_new = np.where(self.prior.pdf(theta_new) == 0,
np.finfo(float).max,
loss_theta_new)
return loss_theta_new