Source code for allel.stats.sf

# -*- coding: utf-8 -*-
import numpy as np


from allel.util import asarray_ndim, check_integer_dtype


def _check_dac_n(dac, n):
    dac = asarray_ndim(dac, 1)
    check_integer_dtype(dac)
    mx = np.max(dac)
    if n is None:
        n = mx
    elif n < mx:
        raise ValueError('number of chromosomes too small; expected {}, found {}'
                         .format(n, mx))
    return dac, int(n)


def _check_ac_n(ac, n):
    ac = asarray_ndim(ac, 2)
    if ac.shape[1] != 2:
        raise ValueError('only biallelic variants are supported')
    check_integer_dtype(ac)
    mx = np.max(np.sum(ac, axis=1))
    if n is None:
        n = mx
    elif n < mx:
        raise ValueError('number of chromosomes too small; expected {}, found {}'
                         .format(n, mx))
    return ac, int(n)


[docs]def sfs(dac, n=None): """Compute the site frequency spectrum given derived allele counts at a set of biallelic variants. Parameters ---------- dac : array_like, int, shape (n_variants,) Array of derived allele counts. n : int, optional The total number of chromosomes called. Returns ------- sfs : ndarray, int, shape (n_chromosomes,) Array where the kth element is the number of variant sites with k derived alleles. """ # check input dac, n = _check_dac_n(dac, n) # need platform integer for bincount dac = dac.astype(int, copy=False) # compute site frequency spectrum x = n + 1 s = np.bincount(dac, minlength=x) return s
[docs]def sfs_folded(ac, n=None): """Compute the folded site frequency spectrum given reference and alternate allele counts at a set of biallelic variants. Parameters ---------- ac : array_like, int, shape (n_variants, 2) Allele counts array. n : int, optional The total number of chromosomes called. Returns ------- sfs_folded : ndarray, int, shape (n_chromosomes//2,) Array where the kth element is the number of variant sites with a minor allele count of k. """ # check input ac, n = _check_ac_n(ac, n) # compute minor allele counts mac = np.amin(ac, axis=1) # need platform integer for bincount mac = mac.astype(int, copy=False) # compute folded site frequency spectrum x = n//2 + 1 s = np.bincount(mac, minlength=x) return s
[docs]def sfs_scaled(dac, n=None): """Compute the site frequency spectrum scaled such that a constant value is expected across the spectrum for neutral variation and constant population size. Parameters ---------- dac : array_like, int, shape (n_variants,) Array of derived allele counts. n : int, optional The total number of chromosomes called. Returns ------- sfs_scaled : ndarray, int, shape (n_chromosomes,) An array where the value of the kth element is the number of variants with k derived alleles, multiplied by k. """ # compute site frequency spectrum s = sfs(dac, n=n) # apply scaling s = scale_sfs(s) return s
[docs]def scale_sfs(s): """Scale a site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes,) Site frequency spectrum. Returns ------- sfs_scaled : ndarray, int, shape (n_chromosomes,) Scaled site frequency spectrum. """ k = np.arange(s.size) out = s * k return out
[docs]def sfs_folded_scaled(ac, n=None): """Compute the folded site frequency spectrum scaled such that a constant value is expected across the spectrum for neutral variation and constant population size. Parameters ---------- ac : array_like, int, shape (n_variants, 2) Allele counts array. n : int, optional The total number of chromosomes called. Returns ------- sfs_folded_scaled : ndarray, int, shape (n_chromosomes//2,) An array where the value of the kth element is the number of variants with minor allele count k, multiplied by the scaling factor (k * (n - k) / n). """ # check input ac, n = _check_ac_n(ac, n) # compute the site frequency spectrum s = sfs_folded(ac, n=n) # apply scaling s = scale_sfs_folded(s, n) return s
[docs]def scale_sfs_folded(s, n): """Scale a folded site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes//2,) Folded site frequency spectrum. n : int Number of chromosomes called. Returns ------- sfs_folded_scaled : ndarray, int, shape (n_chromosomes//2,) Scaled folded site frequency spectrum. """ k = np.arange(s.shape[0]) out = s * k * (n - k) / n return out
[docs]def joint_sfs(dac1, dac2, n1=None, n2=None): """Compute the joint site frequency spectrum between two populations. Parameters ---------- dac1 : array_like, int, shape (n_variants,) Derived allele counts for the first population. dac2 : array_like, int, shape (n_variants,) Derived allele counts for the second population. n1, n2 : int, optional The total number of chromosomes called in each population. Returns ------- joint_sfs : ndarray, int, shape (m_chromosomes, n_chromosomes) Array where the (i, j)th element is the number of variant sites with i derived alleles in the first population and j derived alleles in the second population. """ # check inputs dac1, n1 = _check_dac_n(dac1, n1) dac2, n2 = _check_dac_n(dac2, n2) # compute site frequency spectrum x = n1 + 1 y = n2 + 1 # need platform integer for bincount tmp = (dac1 * y + dac2).astype(int, copy=False) s = np.bincount(tmp) s.resize(x, y) return s
[docs]def joint_sfs_folded(ac1, ac2, n1=None, n2=None): """Compute the joint folded site frequency spectrum between two populations. Parameters ---------- ac1 : array_like, int, shape (n_variants, 2) Allele counts for the first population. ac2 : array_like, int, shape (n_variants, 2) Allele counts for the second population. n1, n2 : int, optional The total number of chromosomes called in each population. Returns ------- joint_sfs_folded : ndarray, int, shape (n1//2 + 1, n2//2 + 1) Array where the (i, j)th element is the number of variant sites with a minor allele count of i in the first population and j in the second population. """ # check inputs ac1, n1 = _check_ac_n(ac1, n1) ac2, n2 = _check_ac_n(ac2, n2) # compute minor allele counts mac1 = np.amin(ac1, axis=1) mac2 = np.amin(ac2, axis=1) # compute site frequency spectrum x = n1//2 + 1 y = n2//2 + 1 tmp = (mac1 * y + mac2).astype(int, copy=False) s = np.bincount(tmp) s.resize(x, y) return s
[docs]def joint_sfs_scaled(dac1, dac2, n1=None, n2=None): """Compute the joint site frequency spectrum between two populations, scaled such that a constant value is expected across the spectrum for neutral variation, constant population size and unrelated populations. Parameters ---------- dac1 : array_like, int, shape (n_variants,) Derived allele counts for the first population. dac2 : array_like, int, shape (n_variants,) Derived allele counts for the second population. n1, n2 : int, optional The total number of chromosomes called in each population. Returns ------- joint_sfs_scaled : ndarray, int, shape (n1 + 1, n2 + 1) Array where the (i, j)th element is the scaled frequency of variant sites with i derived alleles in the first population and j derived alleles in the second population. """ # compute site frequency spectrum s = joint_sfs(dac1, dac2, n1=n1, n2=n2) # apply scaling s = scale_joint_sfs(s) return s
[docs]def scale_joint_sfs(s): """Scale a joint site frequency spectrum. Parameters ---------- s : array_like, int, shape (n1, n2) Joint site frequency spectrum. Returns ------- joint_sfs_scaled : ndarray, int, shape (n1, n2) Scaled joint site frequency spectrum. """ i = np.arange(s.shape[0])[:, None] j = np.arange(s.shape[1])[None, :] out = (s * i) * j return out
[docs]def joint_sfs_folded_scaled(ac1, ac2, n1=None, n2=None): """Compute the joint folded site frequency spectrum between two populations, scaled such that a constant value is expected across the spectrum for neutral variation, constant population size and unrelated populations. Parameters ---------- ac1 : array_like, int, shape (n_variants, 2) Allele counts for the first population. ac2 : array_like, int, shape (n_variants, 2) Allele counts for the second population. n1, n2 : int, optional The total number of chromosomes called in each population. Returns ------- joint_sfs_folded_scaled : ndarray, int, shape (n1//2 + 1, n2//2 + 1) Array where the (i, j)th element is the scaled frequency of variant sites with a minor allele count of i in the first population and j in the second population. """ # noqa # check inputs ac1, n1 = _check_ac_n(ac1, n1) ac2, n2 = _check_ac_n(ac2, n2) # compute site frequency spectrum s = joint_sfs_folded(ac1, ac2, n1=n1, n2=n2) # apply scaling s = scale_joint_sfs_folded(s, n1, n2) return s
[docs]def scale_joint_sfs_folded(s, n1, n2): """Scale a folded joint site frequency spectrum. Parameters ---------- s : array_like, int, shape (m_chromosomes//2, n_chromosomes//2) Folded joint site frequency spectrum. n1, n2 : int, optional The total number of chromosomes called in each population. Returns ------- joint_sfs_folded_scaled : ndarray, int, shape (m_chromosomes//2, n_chromosomes//2) Scaled folded joint site frequency spectrum. """ # noqa out = np.empty_like(s) for i in range(s.shape[0]): for j in range(s.shape[1]): out[i, j] = s[i, j] * i * j * (n1 - i) * (n2 - j) return out
[docs]def fold_sfs(s, n): """Fold a site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes,) Site frequency spectrum n : int Total number of chromosomes called. Returns ------- sfs_folded : ndarray, int Folded site frequency spectrum """ # check inputs s = asarray_ndim(s, 1) assert s.shape[0] <= n + 1, 'invalid number of chromosomes' # need to check s has all entries up to n if s.shape[0] < n + 1: sn = np.zeros(n + 1, dtype=s.dtype) sn[:s.shape[0]] = s s = sn # fold nf = (n + 1) // 2 n = nf * 2 o = s[:nf] + s[nf:n][::-1] return o
[docs]def fold_joint_sfs(s, n1, n2): """Fold a joint site frequency spectrum. Parameters ---------- s : array_like, int, shape (m_chromosomes, n_chromosomes) Joint site frequency spectrum. n1, n2 : int, optional The total number of chromosomes called in each population. Returns ------- joint_sfs_folded : ndarray, int Folded joint site frequency spectrum. """ # check inputs s = asarray_ndim(s, 2) assert s.shape[0] <= n1 + 1, 'invalid number of chromosomes' assert s.shape[1] <= n2 + 1, 'invalid number of chromosomes' # need to check s has all entries up to m if s.shape[0] < n1 + 1: sm = np.zeros((n1 + 1, s.shape[1]), dtype=s.dtype) sm[:s.shape[0]] = s s = sm # need to check s has all entries up to n if s.shape[1] < n2 + 1: sn = np.zeros((s.shape[0], n2 + 1), dtype=s.dtype) sn[:, :s.shape[1]] = s s = sn # fold mf = (n1 + 1) // 2 nf = (n2 + 1) // 2 n1 = mf * 2 n2 = nf * 2 o = (s[:mf, :nf] + # top left s[mf:n1, :nf][::-1] + # top right s[:mf, nf:n2][:, ::-1] + # bottom left s[mf:n1, nf:n2][::-1, ::-1]) # bottom right return o
[docs]def plot_sfs(s, yscale='log', bins=None, n=None, clip_endpoints=True, label=None, plot_kwargs=None, ax=None): """Plot a site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes,) Site frequency spectrum. yscale : string, optional Y axis scale. bins : int or array_like, int, optional Allele count bins. n : int, optional Number of chromosomes sampled. If provided, X axis will be plotted as allele frequency, otherwise as allele count. clip_endpoints : bool, optional If True, do not plot first and last values from frequency spectrum. label : string, optional Label for data series in plot. plot_kwargs : dict-like Additional keyword arguments, passed through to ax.plot(). ax : axes, optional Axes on which to draw. If not provided, a new figure will be created. Returns ------- ax : axes The axes on which the plot was drawn. """ import matplotlib.pyplot as plt import scipy # check inputs s = asarray_ndim(s, 1) # setup axes if ax is None: fig, ax = plt.subplots() # setup data if bins is None: if clip_endpoints: x = np.arange(1, s.shape[0]-1) y = s[1:-1] else: x = np.arange(s.shape[0]) y = s else: if clip_endpoints: y, b, _ = scipy.stats.binned_statistic( np.arange(1, s.shape[0]-1), values=s[1:-1], bins=bins, statistic='sum') else: y, b, _ = scipy.stats.binned_statistic( np.arange(s.shape[0]), values=s, bins=bins, statistic='sum') # use bin midpoints for plotting x = (b[:-1] + b[1:]) / 2 if n: # convert allele counts to allele frequencies x = x / n ax.set_xlabel('derived allele frequency') else: ax.set_xlabel('derived allele count') # do plotting if plot_kwargs is None: plot_kwargs = dict() ax.plot(x, y, label=label, **plot_kwargs) # tidy ax.set_yscale(yscale) ax.set_ylabel('site frequency') ax.autoscale(axis='x', tight=True) return ax
# noinspection PyIncorrectDocstring
[docs]def plot_sfs_folded(*args, **kwargs): """Plot a folded site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes/2,) Site frequency spectrum. yscale : string, optional Y axis scale. bins : int or array_like, int, optional Allele count bins. n : int, optional Number of chromosomes sampled. If provided, X axis will be plotted as allele frequency, otherwise as allele count. clip_endpoints : bool, optional If True, do not plot first and last values from frequency spectrum. label : string, optional Label for data series in plot. plot_kwargs : dict-like Additional keyword arguments, passed through to ax.plot(). ax : axes, optional Axes on which to draw. If not provided, a new figure will be created. Returns ------- ax : axes The axes on which the plot was drawn. """ ax = plot_sfs(*args, **kwargs) n = kwargs.get('n', None) if n: ax.set_xlabel('minor allele frequency') else: ax.set_xlabel('minor allele count') return ax
# noinspection PyIncorrectDocstring
[docs]def plot_sfs_scaled(*args, **kwargs): """Plot a scaled site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes,) Site frequency spectrum. yscale : string, optional Y axis scale. bins : int or array_like, int, optional Allele count bins. n : int, optional Number of chromosomes sampled. If provided, X axis will be plotted as allele frequency, otherwise as allele count. clip_endpoints : bool, optional If True, do not plot first and last values from frequency spectrum. label : string, optional Label for data series in plot. plot_kwargs : dict-like Additional keyword arguments, passed through to ax.plot(). ax : axes, optional Axes on which to draw. If not provided, a new figure will be created. Returns ------- ax : axes The axes on which the plot was drawn. """ kwargs.setdefault('yscale', 'linear') ax = plot_sfs(*args, **kwargs) ax.set_ylabel('scaled site frequency') return ax
# noinspection PyIncorrectDocstring
[docs]def plot_sfs_folded_scaled(*args, **kwargs): """Plot a folded scaled site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes/2,) Site frequency spectrum. yscale : string, optional Y axis scale. bins : int or array_like, int, optional Allele count bins. n : int, optional Number of chromosomes sampled. If provided, X axis will be plotted as allele frequency, otherwise as allele count. clip_endpoints : bool, optional If True, do not plot first and last values from frequency spectrum. label : string, optional Label for data series in plot. plot_kwargs : dict-like Additional keyword arguments, passed through to ax.plot(). ax : axes, optional Axes on which to draw. If not provided, a new figure will be created. Returns ------- ax : axes The axes on which the plot was drawn. """ kwargs.setdefault('yscale', 'linear') ax = plot_sfs_folded(*args, **kwargs) ax.set_ylabel('scaled site frequency') n = kwargs.get('n', None) if n: ax.set_xlabel('minor allele frequency') else: ax.set_xlabel('minor allele count') return ax
[docs]def plot_joint_sfs(s, ax=None, imshow_kwargs=None): """Plot a joint site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes_pop1, n_chromosomes_pop2) Joint site frequency spectrum. ax : axes, optional Axes on which to draw. If not provided, a new figure will be created. imshow_kwargs : dict-like Additional keyword arguments, passed through to ax.imshow(). Returns ------- ax : axes The axes on which the plot was drawn. """ import matplotlib.pyplot as plt from matplotlib.colors import LogNorm # check inputs s = asarray_ndim(s, 2) # setup axes if ax is None: w = plt.rcParams['figure.figsize'][0] fig, ax = plt.subplots(figsize=(w, w)) # set plotting defaults if imshow_kwargs is None: imshow_kwargs = dict() imshow_kwargs.setdefault('cmap', 'jet') imshow_kwargs.setdefault('interpolation', 'none') imshow_kwargs.setdefault('aspect', 'auto') imshow_kwargs.setdefault('norm', LogNorm()) # plot data ax.imshow(s.T, **imshow_kwargs) # tidy ax.invert_yaxis() ax.set_xlabel('derived allele count (population 1)') ax.set_ylabel('derived allele count (population 2)') return ax
# noinspection PyIncorrectDocstring
[docs]def plot_joint_sfs_folded(*args, **kwargs): """Plot a joint site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes_pop1/2, n_chromosomes_pop2/2) Joint site frequency spectrum. ax : axes, optional Axes on which to draw. If not provided, a new figure will be created. imshow_kwargs : dict-like Additional keyword arguments, passed through to ax.imshow(). Returns ------- ax : axes The axes on which the plot was drawn. """ ax = plot_joint_sfs(*args, **kwargs) ax.set_xlabel('minor allele count (population 1)') ax.set_ylabel('minor allele count (population 2)') return ax
# noinspection PyIncorrectDocstring
[docs]def plot_joint_sfs_scaled(*args, **kwargs): """Plot a scaled joint site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes_pop1, n_chromosomes_pop2) Joint site frequency spectrum. ax : axes, optional Axes on which to draw. If not provided, a new figure will be created. imshow_kwargs : dict-like Additional keyword arguments, passed through to ax.imshow(). Returns ------- ax : axes The axes on which the plot was drawn. """ imshow_kwargs = kwargs.get('imshow_kwargs', dict()) imshow_kwargs.setdefault('norm', None) kwargs['imshow_kwargs'] = imshow_kwargs ax = plot_joint_sfs(*args, **kwargs) return ax
# noinspection PyIncorrectDocstring
[docs]def plot_joint_sfs_folded_scaled(*args, **kwargs): """Plot a scaled folded joint site frequency spectrum. Parameters ---------- s : array_like, int, shape (n_chromosomes_pop1/2, n_chromosomes_pop2/2) Joint site frequency spectrum. ax : axes, optional Axes on which to draw. If not provided, a new figure will be created. imshow_kwargs : dict-like Additional keyword arguments, passed through to ax.imshow(). Returns ------- ax : axes The axes on which the plot was drawn. """ imshow_kwargs = kwargs.get('imshow_kwargs', dict()) imshow_kwargs.setdefault('norm', None) kwargs['imshow_kwargs'] = imshow_kwargs ax = plot_joint_sfs_folded(*args, **kwargs) ax.set_xlabel('minor allele count (population 1)') ax.set_ylabel('minor allele count (population 2)') return ax