Source code for elfi.methods.diagnostics

"""Methods for ABC diagnostics."""

import logging
from itertools import combinations

import numpy as np
from scipy.spatial import cKDTree
from scipy.special import digamma, gamma

import elfi

logger = logging.getLogger(__name__)


[docs]class TwoStageSelection: """Perform the summary-statistics selection proposed by Nunes and Balding (2010). The user can provide a list of summary statistics as list_ss, and let ELFI to combine them, or provide some already combined summary statistics as prepared_ss. The rationale of the Two Stage procedure procedure is the following: - First, the module computes or accepts the combinations of the candidate summary statistics. - In Stage 1, each summary-statistics combination is evaluated using the Minimum Entropy algorithm. - In Stage 2, the minimum-entropy combination is selected, and the 'closest' datasets are identified. - Further in Stage 2, for each summary-statistics combination, the mean root sum of squared errors (MRSSE) is calculated over all 'closest datasets', and the minimum-MRSSE combination is chosen as the one with the optimal performance. References ---------- [1] Nunes, M. A., & Balding, D. J. (2010). On optimal selection of summary statistics for approximate Bayesian computation. Statistical applications in genetics and molecular biology, 9(1). [2] Blum, M. G., Nunes, M. A., Prangle, D., & Sisson, S. A. (2013). A comparative review of dimension reduction methods in approximate Bayesian computation. Statistical Science, 28(2), 189-208. """ def __init__(self, simulator, fn_distance, list_ss=None, prepared_ss=None, max_cardinality=4, seed=0): """Initialise the summary-statistics selection for the Two Stage Procedure. Parameters ---------- simulator : elfi.Node Node (often elfi.Simulator) for which the summary statistics will be applied. The node is the final node of a coherent ElfiModel (i.e. it has no child nodes). fn_distance : str or callable function Distance metric, consult the elfi.Distance documentation for calling as a string. list_ss : List of callable functions, optional List of candidate summary statistics. prepared_ss : List of lists of callable functions, optional List of prepared combinations of candidate summary statistics. No other combinations will be evaluated. max_cardinality : int, optional Maximum cardinality of a candidate summary-statistics combination. seed : int, optional """ if list_ss is None and prepared_ss is None: raise ValueError('No summary statistics to assess.') self.simulator = simulator self.fn_distance = fn_distance self.seed = seed if prepared_ss is not None: self.ss_candidates = prepared_ss else: self.ss_candidates = self._combine_ss(list_ss, max_cardinality=max_cardinality) # Initialising an output pool as the rejection sampling will be used several times. self.pool = elfi.OutputPool(simulator.name) def _combine_ss(self, list_ss, max_cardinality): """Create all combinations of the summary statistics up till the maximum cardinality. Parameters ---------- list_ss : List of callable functions List of candidate summary statistics. max_cardinality : int Maximum cardinality of a candidate summary-statistics combination. Returns ------- List Combinations of candidate summary statistics. """ if max_cardinality > len(list_ss): max_cardinality = len(list_ss) # Combine the candidate summary statistics. combinations_ss = [] for i in range(max_cardinality): for combination in combinations(list_ss, i + 1): combinations_ss.append(combination) return combinations_ss
[docs] def run(self, n_sim, n_acc=None, n_closest=None, batch_size=1, k=4): """Run the Two Stage Procedure for identifying relevant summary statistics. Parameters ---------- n_sim : int Number of the total ABC-rejection simulations. n_acc : int, optional Number of the accepted ABC-rejection simulations. n_closest : int, optional Number of the 'closest' datasets (i.e., the closest n simulation datasets w.r.t the observations). batch_size : int, optional Number of samples per batch. k : int, optional Parameter for the kth-nearest-neighbour search performed in the minimum-entropy step (in Nunes & Balding, 2010 it is fixed to 4). Returns ------- array_like Summary-statistics combination showing the optimal performance. """ # Setting the default value of n_acc to the .01 quantile of n_sim, # and n_closest to the .01 quantile of n_acc as in Nunes and Balding (2010). if n_acc is None: n_acc = int(n_sim / 100) if n_closest is None: n_closest = int(n_acc / 100) if n_sim < n_acc or n_acc < n_closest or n_closest == 0: raise ValueError("The number of simulations is too small.") # Find the summary-statistics combination with the minimum entropy, and # preserve the parameters (thetas) corresponding to the `closest' datasets. thetas = {} E_me = np.inf names_ss_me = [] for set_ss in self.ss_candidates: names_ss = [ss.__name__ for ss in set_ss] thetas_ss = self._obtain_accepted_thetas(set_ss, n_sim, n_acc, batch_size) thetas[set_ss] = thetas_ss E_ss = self._calc_entropy(thetas_ss, n_acc, k) # If equal, dismiss the combination which contains uninformative summary statistics. if (E_ss == E_me and (len(names_ss_me) > len(names_ss))) or E_ss < E_me: E_me = E_ss names_ss_me = names_ss thetas_closest = thetas_ss[:n_closest] logger.info('Combination %s shows the entropy of %f' % (names_ss, E_ss)) # Note: entropy is in the log space (negative values allowed). logger.info('\nThe minimum entropy of %f was found in %s.\n' % (E_me, names_ss_me)) # Find the summary-statistics combination with # the minimum mean root sum of squared error (MRSSE). MRSSE_min = np.inf names_ss_MRSSE = [] for set_ss in self.ss_candidates: names_ss = [ss.__name__ for ss in set_ss] MRSSE_ss = self._calc_MRSSE(set_ss, thetas_closest, thetas[set_ss]) # If equal, dismiss the combination which contains uninformative summary statistics. if (MRSSE_ss == MRSSE_min and (len(names_ss_MRSSE) > len(names_ss))) \ or MRSSE_ss < MRSSE_min: MRSSE_min = MRSSE_ss names_ss_MRSSE = names_ss set_ss_2stage = set_ss logger.info('Combination %s shows the MRSSE of %f' % (names_ss, MRSSE_ss)) logger.info('\nThe minimum MRSSE of %f was found in %s.' % (MRSSE_min, names_ss_MRSSE)) return set_ss_2stage
def _obtain_accepted_thetas(self, set_ss, n_sim, n_acc, batch_size): """Perform the ABC-rejection sampling and identify `closest' parameters. The sampling is performed using the initialised simulator. Parameters ---------- set_ss : List Summary-statistics combination to be used in the rejection sampling. n_sim : int Number of the iterations of the rejection sampling. n_acc : int Number of the accepted parameters. batch_size : int Number of samples per batch. Returns ------- array_like Accepted parameters. """ # Initialise the distance function. m = self.simulator.model.copy() list_ss = [] for ss in set_ss: list_ss.append(elfi.Summary(ss, m[self.simulator.name], model=m)) if isinstance(self.fn_distance, str): d = elfi.Distance(self.fn_distance, *list_ss, model=m) else: d = elfi.Discrepancy(self.fn_distance, *list_ss, model=m) # Run the simulations. # TODO: include different distance functions in the summary-statistics combinations. sampler_rejection = elfi.Rejection(d, batch_size=batch_size, seed=self.seed, pool=self.pool) result = sampler_rejection.sample(n_acc, n_sim=n_sim) # Extract the accepted parameters. thetas_acc = result.samples_array return thetas_acc def _calc_entropy(self, thetas_ss, n_acc, k): """Calculate the entropy as described in Nunes & Balding, 2010. E = log( pi^(q/2) / gamma(q/2+1) ) - digamma(k) + log(n) + q/n * sum_{i=1}^n( log(R_{i, k}) ), where R_{i, k} is the Euclidean distance from the parameter theta_i to its kth nearest neighbour; q is the dimensionality of the parameter; and n is the number of the accepted parameters n_acc in the rejection sampling. Parameters ---------- thetas_ss : array_like Parameters accepted upon the rejection sampling using the summary-statistics combination ss. n_acc : int Number of the accepted parameters. k : int Nearest neighbour to be searched. Returns ------- float Entropy. """ q = thetas_ss.shape[1] # Calculate the distance to the kth nearest neighbour across all accepted parameters. searcher_knn = cKDTree(thetas_ss) sum_log_dist_knn = 0 for theta_ss in thetas_ss: dist_knn = searcher_knn.query(theta_ss, k=k)[0][-1] sum_log_dist_knn += np.log(dist_knn) # Calculate the entropy. E = np.log(np.pi**(q / 2) / gamma((q / 2) + 1)) - digamma(k) \ + np.log(n_acc) + (q / n_acc) * sum_log_dist_knn return E def _calc_MRSSE(self, set_ss, thetas_obs, thetas_sim): """Calculate the mean root of squared error (MRSSE) as described in Nunes & Balding, 2010. MRSSE = 1/n * sum_{j=1}^n( RSSE(j) ), RSSE = 1/m * sum_{i=1}^m( theta_i - theta_true ), where n is the number of the `closest' datasets identified using the summary-statistics combination corresponding to the minimum entropy; m is the number of the accepted parameters in the rejection sampling for set_ss; theta_i is an instance of the parameters corresponding to set_ss; and theta_true is the parameters corresponding to a `closest' dataset. Parameters ---------- set_ss : List Summary-statistics combination used in the rejection sampling. thetas_obs : array_like List of parameters corresponding to the `closest' datasets. thetas_sim : array_like Parameters corresponding to set_ss. Returns ------- float Mean root of squared error. """ RSSE_total = 0 for theta_obs in thetas_obs: SSE = np.linalg.norm(thetas_sim - theta_obs)**2 RSSE = np.sqrt(SSE) RSSE_total += RSSE MRSSE = RSSE_total / len(thetas_obs) return MRSSE