Source code for conditional_independence.ci_tests.nonparametric.kci

import numpy as np
from sklearn.preprocessing import scale
from sklearn.metrics.pairwise import euclidean_distances
import itertools as itr
from scipy.stats import gamma
from typing import Dict, Union, List
from ...utils import kernels
from ...utils import residuals


def ki_test_vector(
        Y: np.ndarray,
        X: np.ndarray,
        width_x: float = 0.,
        width_y: float = 0.,
        alpha: float = 0.05,
        gamma_approx: bool = True,
        n_draws: int = 500,
        lam: float = 1e-3,
        thresh: float = 1e-5,
        num_eig: int = 0,
        catgorical_x: bool = False
):
    """
    Test the null hypothesis that Y and X are independent

    :param Y: (n*_) matrix
    :param X: (n*_) matrix
    :param width_x: Kernel width. If 0, chosen automatically.
    :param width_y: Kernel width. If 0, chosen automatically.
    :param alpha: Significance level
    :param gamma_approx: If True, approximate the null distribution by a Gamma distribution. Otherwise, use a Monte
        Carlo approximation.
    :param n_draws:
    :param lam:
    :param thresh:
    :param num_eig:
    :return:
    """
    # === ASSIGN VARIABLES USED THROUGHOUT METHOD ===
    if X.ndim == 1:
        X = X.reshape((len(X), 1))
    if Y.ndim == 1:
        Y = Y.reshape((len(Y), 1))
    n = X.shape[0]
    if Y.shape[0] != n:
        raise ValueError("Y should have the same number of samples as X")

    # === CREATE KERNEL MATRICES ===
    if catgorical_x:
        kx = kernels.delta_kernel(X)
    else:
        if width_x == 0:
            width_x = np.median(euclidean_distances(X))
        X = scale(X)
        kernel_precision_x = 1 / (width_x ** 2)  # TODO: CHECK
        kx = kernels.rbf_kernel(X, kernel_precision_x)
    if width_y == 0:
        width_y = np.median(euclidean_distances(Y))
    Y = scale(Y)
    kernel_precision_y = 1 / (width_y ** 2)  # TODO: CHECK

    H = np.eye(n) - np.ones([n, n]) / n
    kx = H @ kx @ H
    ky = kernels.rbf_kernel(Y, kernel_precision_y)
    ky = H @ ky @ H

    # === COMPUTE STATISTIC ====
    statistic = np.sum(kx * ky.T) / n  # same as trace of product

    # === COMPUTE NULL DISTRIBUTION ====
    if not gamma_approx:
        raise NotImplementedError
    else:
        mean_approx = 1 / n ** 2 * np.trace(kx) * np.trace(ky)
        var_approx = 2 / n ** 4 * np.sum(kx * kx) * np.sum(ky * ky)
        # k is shape, theta is scale
        k_approx = mean_approx ** 2 / var_approx
        prec_approx = var_approx / mean_approx

        critval = gamma.ppf(1 - alpha, k_approx, scale=prec_approx)
        pval = 1 - gamma.cdf(statistic, k_approx, scale=prec_approx)

    return dict(statistic=statistic, critval=critval, p_value=pval, reject=statistic > critval)


def kci_test_vector(
        Y: np.array,
        E: np.array,
        X: np.array,
        width: float = 0.,
        alpha: float = 0.05,
        unbiased: bool = False,
        gamma_approx: bool = True,
        n_draws: int = 500,
        lam: float = 1e-3,
        thresh: float = 1e-5,
        num_eig: int = 0,
        catgorical_e: bool = False
) -> Dict:
    """
    Test the null hypothesis that Y and E are independent given X.

    :param Y: (n*_) matrix
    :param E: (n*_) matrix
    :param X: (n*d) matrix
    :param width: Kernel width. If 0, chosen automatically.
    :param alpha: Significance level
    :param unbiased: Whether bias correction should be applied.
    :param gamma_approx: If True, approximate the null distribution by a Gamma distribution. Otherwise, use a Monte
        Carlo approximation.
    :param n_draws: Number of draws in Monte Carlo approach if gamma_approx=False
    :param lam: Regularization parameter for matrix inversions
    :param thresh: Lower threshold for eigenvalues
    :return: (statistic, critval, pval). The p-value for the null hypothesis that Y and E are independent given X.
    """
    # ASSIGN VARIABLES USED THROUGHOUT METHOD
    if X.ndim == 1:
        X = X.reshape((len(X), 1))
    if Y.ndim == 1:
        Y = Y.reshape((len(Y), 1))
    if E.ndim == 1:
        E = E.reshape((len(E), 1))
    n, d = X.shape
    if Y.shape[0] != n:
        raise ValueError("Y should have the same number of samples as X")
    if E.shape[0] != n:
        raise ValueError("E should have the same number of samples as X and Y")

    Y = scale(Y)
    X = scale(X)
    if width == 0:
        if n <= 200:
            width = 0.8
        elif n < 1200:
            width = 0.5
        else:
            width = 0.3
    if num_eig == 0:
        num_eig = n
    kernel_precision = 1 / (width ** 2 * d)

    if catgorical_e:
        ke = kernels.delta_kernel(E)
    else:
        E = scale(E)
        ke = kernels.rbf_kernel(E, kernel_precision)

    # === CREATE KERNEL MATRICES ===
    H = np.eye(n) - np.ones([n, n]) / n

    kyx = kernels.rbf_kernel(np.concatenate((Y, X / 2), axis=1), kernel_precision)
    kyx = H @ kyx @ H  # Centralize Kyx

    ke = H @ ke @ H  # Centralize Ke

    kx = kernels.rbf_kernel(X, kernel_precision)
    kx = H @ kx @ H  # Centralize Kx

    rx = np.eye(n) - kx @ np.linalg.inv(kx + lam * np.eye(n))
    kyx = rx @ kyx @ rx.T  # Equation (11)
    kex = rx @ ke @ rx.T  # Equation (12)

    statistic = np.sum(kyx * kex.T)
    dfE = np.sum(np.diag(np.eye(n) - rx))

    # === CALCULATE EIGENVALUES AND EIGENVECTORS ===
    eigvecs_kyx, eigvals_kyx, _ = np.linalg.svd((kyx + kyx.T) / 2)
    eigvals_kyx = eigvals_kyx[:num_eig]
    eigvecs_kyx = eigvecs_kyx[:, :num_eig]
    eigvecs_kex, eigvals_kex, _ = np.linalg.svd((kex + kex.T) / 2)
    eigvals_kex = eigvals_kex[:num_eig]
    eigvecs_kex = eigvecs_kex[:, :num_eig]

    # === THRESHOLD EIGENVALUES AND EIGENVECTORS ===
    ixs_yx = eigvals_kyx > np.max(eigvals_kyx) * thresh
    eigvals_kyx = eigvals_kyx[ixs_yx]
    eigvecs_kyx = eigvecs_kyx[:, ixs_yx]
    ixs_ex = eigvals_kex > np.max(eigvals_kex) * thresh
    eigvals_kex = eigvals_kex[ixs_ex]
    eigvecs_kex = eigvecs_kex[:, ixs_ex]

    # === CALCULATE PRODUCT OF EIGENVECTORS WITH SQUARE ROOT OF EIGENVALUES
    eigprod_kyx = eigvecs_kyx * np.sqrt(eigvals_kyx)[None, :]  # TODO: CHECK
    eigprod_kex = eigvecs_kex * np.sqrt(eigvals_kex)[None, :]  # TODO: CHECK

    # === CALCULATE W ===
    d_yx = eigprod_kyx.shape[1]
    d_ex = eigprod_kex.shape[1]

    w = np.zeros([d_yx * d_ex, n])
    for i, j in itr.product(range(d_yx), range(d_ex)):
        w[(i - 1) * d_ex + j] = eigprod_kyx[:, i] * eigprod_kex[:, j]  # TODO: CHECK
    ww = w @ w.T if d_yx * d_ex < n else w.T @ w

    if not gamma_approx:
        # TODO
        raise NotImplementedError
    else:
        mean_approx = np.sum(np.diag(ww))
        var_approx = 2 * np.sum(np.diag(ww ** 2))
        k_approx = mean_approx ** 2 / var_approx
        prec_approx = var_approx / mean_approx

        critval = gamma.ppf(1 - alpha, k_approx, scale=prec_approx)
        pval = 1 - gamma.cdf(statistic, k_approx, scale=prec_approx)

    return dict(statistic=statistic, critval=critval, p_value=pval, reject=statistic > critval)


[docs]def kci_test( suffstat: np.array, i, j, cond_set: Union[List[int], int] = None, width: float = 0., alpha: float = 0.05, unbiased: bool = False, gamma_approx: bool = True, regress: bool = True, n_draws: int = 500, lam: float = 1e-3, thresh: float = 1e-5, num_eig: int = 0, categorical_e: bool = False ) -> Dict: """ TODO """ if isinstance(cond_set, int): cond_set = [cond_set] if cond_set is None or len(cond_set) == 0: return ki_test_vector( suffstat[:, i], suffstat[:, j], width_x=width, width_y=width, alpha=alpha, gamma_approx=gamma_approx, n_draws=n_draws, lam=lam, thresh=thresh, num_eig=num_eig, catgorical_x=categorical_e ) else: if regress: residuals_i, residuals_j = residuals(suffstat, i, j, cond_set) return ki_test_vector( residuals_i, residuals_j, width_x=width, width_y=width, alpha=alpha, gamma_approx=gamma_approx, n_draws=n_draws, lam=lam, thresh=thresh, num_eig=num_eig, catgorical_x=categorical_e ) else: return kci_test_vector( suffstat[:, i], suffstat[:, j], suffstat[:, cond_set], width=width, alpha=alpha, unbiased=unbiased, gamma_approx=gamma_approx, n_draws=n_draws, lam=lam, thresh=thresh, num_eig=num_eig, catgorical_e=categorical_e )