Source code for allel.stats.ld

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division


import numpy as np


from allel.stats.window import windowed_statistic
from allel.util import asarray_ndim, ensure_square
from allel.chunked import get_blen_array
from allel.compat import memoryview_safe
from allel.opt.stats import gn_pairwise_corrcoef_int8, gn_pairwise2_corrcoef_int8, \
    gn_locate_unlinked_int8


[docs]def rogers_huff_r(gn): """Estimate the linkage disequilibrium parameter *r* for each pair of variants using the method of Rogers and Huff (2008). Parameters ---------- gn : array_like, int8, shape (n_variants, n_samples) Diploid genotypes at biallelic variants, coded as the number of alternate alleles per call (i.e., 0 = hom ref, 1 = het, 2 = hom alt). Returns ------- r : ndarray, float, shape (n_variants * (n_variants - 1) // 2,) Matrix in condensed form. Examples -------- >>> import allel >>> g = allel.GenotypeArray([[[0, 0], [1, 1], [0, 0]], ... [[0, 0], [1, 1], [0, 0]], ... [[1, 1], [0, 0], [1, 1]], ... [[0, 0], [0, 1], [-1, -1]]], dtype='i1') >>> gn = g.to_n_alt(fill=-1) >>> gn array([[ 0, 2, 0], [ 0, 2, 0], [ 2, 0, 2], [ 0, 1, -1]], dtype=int8) >>> r = allel.rogers_huff_r(gn) >>> r # doctest: +ELLIPSIS array([ 1. , -1.0000001, 1. , -1.0000001, 1. , -1. ], dtype=float32) >>> r ** 2 # doctest: +ELLIPSIS array([1. , 1.0000002, 1. , 1.0000002, 1. , 1. ], dtype=float32) >>> from scipy.spatial.distance import squareform >>> squareform(r ** 2) array([[0. , 1. , 1.0000002, 1. ], [1. , 0. , 1.0000002, 1. ], [1.0000002, 1.0000002, 0. , 1. ], [1. , 1. , 1. , 0. ]], dtype=float32) """ # check inputs gn = asarray_ndim(gn, 2, dtype='i1') gn = memoryview_safe(gn) # compute correlation coefficients r = gn_pairwise_corrcoef_int8(gn) # convenience for singletons if r.size == 1: r = r[0] return r
[docs]def rogers_huff_r_between(gna, gnb): """Estimate the linkage disequilibrium parameter *r* for each pair of variants between the two input arrays, using the method of Rogers and Huff (2008). Parameters ---------- gna, gnb : array_like, int8, shape (n_variants, n_samples) Diploid genotypes at biallelic variants, coded as the number of alternate alleles per call (i.e., 0 = hom ref, 1 = het, 2 = hom alt). Returns ------- r : ndarray, float, shape (m_variants, n_variants ) Matrix in rectangular form. """ # check inputs gna = asarray_ndim(gna, 2, dtype='i1') gnb = asarray_ndim(gnb, 2, dtype='i1') gna = memoryview_safe(gna) gnb = memoryview_safe(gnb) # compute correlation coefficients r = gn_pairwise2_corrcoef_int8(gna, gnb) # convenience for singletons if r.size == 1: r = r[0, 0] return r
[docs]def locate_unlinked(gn, size=100, step=20, threshold=.1, blen=None): """Locate variants in approximate linkage equilibrium, where r**2 is below the given `threshold`. Parameters ---------- gn : array_like, int8, shape (n_variants, n_samples) Diploid genotypes at biallelic variants, coded as the number of alternate alleles per call (i.e., 0 = hom ref, 1 = het, 2 = hom alt). size : int Window size (number of variants). step : int Number of variants to advance to the next window. threshold : float Maximum value of r**2 to include variants. blen : int, optional Block length to use for chunked computation. Returns ------- loc : ndarray, bool, shape (n_variants) Boolean array where True items locate variants in approximate linkage equilibrium. Notes ----- The value of r**2 between each pair of variants is calculated using the method of Rogers and Huff (2008). """ # check inputs if not hasattr(gn, 'shape') or not hasattr(gn, 'dtype'): gn = np.asarray(gn, dtype='i1') if gn.ndim != 2: raise ValueError('gn must have two dimensions') # setup output loc = np.ones(gn.shape[0], dtype='u1') # compute in chunks to avoid loading big arrays into memory blen = get_blen_array(gn, blen) blen = max(blen, 10*size) # avoid too small chunks n_variants = gn.shape[0] for i in range(0, n_variants, blen): # N.B., ensure overlap with next window j = min(n_variants, i+blen+size) gnb = np.asarray(gn[i:j], dtype='i1') gnb = memoryview_safe(gnb) locb = loc[i:j] gn_locate_unlinked_int8(gnb, locb, size, step, threshold) return loc.astype('b1')
[docs]def windowed_r_squared(pos, gn, size=None, start=None, stop=None, step=None, windows=None, fill=np.nan, percentile=50): """Summarise linkage disequilibrium in windows over a single chromosome/contig. Parameters ---------- pos : array_like, int, shape (n_items,) The item positions in ascending order, using 1-based coordinates.. gn : array_like, int8, shape (n_variants, n_samples) Diploid genotypes at biallelic variants, coded as the number of alternate alleles per call (i.e., 0 = hom ref, 1 = het, 2 = hom alt). size : int, optional The window size (number of bases). start : int, optional The position at which to start (1-based). stop : int, optional The position at which to stop (1-based). step : int, optional The distance between start positions of windows. If not given, defaults to the window size, i.e., non-overlapping windows. windows : array_like, int, shape (n_windows, 2), optional Manually specify the windows to use as a sequence of (window_start, window_stop) positions, using 1-based coordinates. Overrides the size/start/stop/step parameters. fill : object, optional The value to use where a window is empty, i.e., contains no items. percentile : int or sequence of ints, optional The percentile or percentiles to calculate within each window. Returns ------- out : ndarray, shape (n_windows,) The value of the statistic for each window. windows : ndarray, int, shape (n_windows, 2) The windows used, as an array of (window_start, window_stop) positions, using 1-based coordinates. counts : ndarray, int, shape (n_windows,) The number of items in each window. Notes ----- Linkage disequilibrium (r**2) is calculated using the method of Rogers and Huff (2008). See Also -------- allel.stats.window.windowed_statistic """ # define the statistic function if isinstance(percentile, (list, tuple)): fill = [fill for _ in percentile] def statistic(gnw): r_squared = rogers_huff_r(gnw) ** 2 return [np.percentile(r_squared, p) for p in percentile] else: def statistic(gnw): r_squared = rogers_huff_r(gnw) ** 2 return np.percentile(r_squared, percentile) return windowed_statistic(pos, gn, statistic, size, start=start, stop=stop, step=step, windows=windows, fill=fill)
[docs]def plot_pairwise_ld(m, colorbar=True, ax=None, imshow_kwargs=None): """Plot a matrix of genotype linkage disequilibrium values between all pairs of variants. Parameters ---------- m : array_like Array of linkage disequilibrium values in condensed form. colorbar : bool, optional If True, add a colorbar to the current figure. ax : axes, optional The axes on which to draw. If not provided, a new figure will be created. imshow_kwargs : dict-like, optional Additional keyword arguments passed through to :func:`matplotlib.pyplot.imshow`. Returns ------- ax : axes The axes on which the plot was drawn. """ import matplotlib.pyplot as plt # check inputs m_square = ensure_square(m) # blank out lower triangle and flip up/down m_square = np.tril(m_square)[::-1, :] # set up axes if ax is None: # make a square figure with enough pixels to represent each variant x = m_square.shape[0] / plt.rcParams['figure.dpi'] x = max(x, plt.rcParams['figure.figsize'][0]) fig, ax = plt.subplots(figsize=(x, x)) fig.tight_layout(pad=0) # setup imshow arguments if imshow_kwargs is None: imshow_kwargs = dict() imshow_kwargs.setdefault('interpolation', 'none') imshow_kwargs.setdefault('cmap', 'Greys') imshow_kwargs.setdefault('vmin', 0) imshow_kwargs.setdefault('vmax', 1) # plot as image im = ax.imshow(m_square, **imshow_kwargs) # tidy up ax.set_xticks([]) ax.set_yticks([]) for s in 'bottom', 'right': ax.spines[s].set_visible(False) if colorbar: plt.gcf().colorbar(im, shrink=.5, pad=0) return ax