# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
import numpy as np
from allel.util import asarray_ndim
[docs]def sfs(dac):
"""Compute the site frequency spectrum given derived allele counts at
a set of biallelic variants.
Parameters
----------
dac : array_like, int, shape (n_variants,)
Array of derived allele counts.
Returns
-------
sfs : ndarray, int, shape (n_chromosomes,)
Array where the kth element is the number of variant sites with k
derived alleles.
"""
# check input
dac = asarray_ndim(dac, 1)
# compute site frequency spectrum
s = np.bincount(dac)
return s
[docs]def sfs_folded(ac):
"""Compute the folded site frequency spectrum given reference and
alternate allele counts at a set of biallelic variants.
Parameters
----------
ac : array_like, int, shape (n_variants, 2)
Allele counts array.
Returns
-------
sfs_folded : ndarray, int, shape (n_chromosomes//2,)
Array where the kth element is the number of variant sites with a
minor allele count of k.
"""
# check input
ac = asarray_ndim(ac, 2)
assert ac.shape[1] == 2, 'only biallelic variants are supported'
# compute minor allele counts
mac = np.amin(ac, axis=1)
# compute folded site frequency spectrum
s = np.bincount(mac)
return s
[docs]def sfs_scaled(dac):
"""Compute the site frequency spectrum scaled such that a constant value is
expected across the spectrum for neutral variation and constant
population size.
Parameters
----------
dac : array_like, int, shape (n_variants,)
Array of derived allele counts.
Returns
-------
sfs_scaled : ndarray, int, shape (n_chromosomes,)
An array where the value of the kth element is the number of variants
with k derived alleles, multiplied by k.
"""
# compute site frequency spectrum
s = sfs(dac)
# apply scaling
s = scale_sfs(s)
return s
[docs]def scale_sfs(s):
"""Scale a site frequency spectrum.
Parameters
----------
s : array_like, int, shape (n_chromosomes,)
Site frequency spectrum.
Returns
-------
sfs_scaled : ndarray, int, shape (n_chromosomes,)
Scaled site frequency spectrum.
"""
k = np.arange(s.size)
out = s * k
return out
[docs]def sfs_folded_scaled(ac, n=None):
"""Compute the folded site frequency spectrum scaled such that a constant
value is expected across the spectrum for neutral variation and constant
population size.
Parameters
----------
ac : array_like, int, shape (n_variants, 2)
Allele counts array.
n : int, optional
The total number of chromosomes called at each variant site. Equal to
the number of samples multiplied by the ploidy. If not provided,
will be inferred to be the maximum value of the sum of reference and
alternate allele counts present in `ac`.
Returns
-------
sfs_folded_scaled : ndarray, int, shape (n_chromosomes//2,)
An array where the value of the kth element is the number of variants
with minor allele count k, multiplied by the scaling factor
(k * (n - k) / n).
"""
# compute the site frequency spectrum
s = sfs_folded(ac)
# determine the total number of chromosomes called
if n is None:
n = np.amax(np.sum(ac, axis=1))
# apply scaling
s = scale_sfs_folded(s, n)
return s
[docs]def scale_sfs_folded(s, n):
"""Scale a folded site frequency spectrum.
Parameters
----------
s : array_like, int, shape (n_chromosomes//2,)
Folded site frequency spectrum.
n : int
Number of chromosomes called.
Returns
-------
sfs_folded_scaled : ndarray, int, shape (n_chromosomes//2,)
Scaled folded site frequency spectrum.
"""
k = np.arange(s.shape[0])
out = s * k * (n - k) / n
return out
[docs]def joint_sfs(dac1, dac2):
"""Compute the joint site frequency spectrum between two populations.
Parameters
----------
dac1 : array_like, int, shape (n_variants,)
Derived allele counts for the first population.
dac2 : array_like, int, shape (n_variants,)
Derived allele counts for the second population.
Returns
-------
joint_sfs : ndarray, int, shape (m_chromosomes, n_chromosomes)
Array where the (i, j)th element is the number of variant sites with i
derived alleles in the first population and j derived alleles in the
second population.
"""
# check inputs
dac1 = asarray_ndim(dac1, 1)
dac2 = asarray_ndim(dac2, 1)
# compute site frequency spectrum
n = np.max(dac1) + 1
m = np.max(dac2) + 1
s = np.bincount(dac1 * m + dac2)
s.resize((n, m))
return s
[docs]def joint_sfs_folded(ac1, ac2):
"""Compute the joint folded site frequency spectrum between two
populations.
Parameters
----------
ac1 : array_like, int, shape (n_variants, 2)
Allele counts for the first population.
ac2 : array_like, int, shape (n_variants, 2)
Allele counts for the second population.
Returns
-------
joint_sfs_folded : ndarray, int, shape (m_chromosomes//2, n_chromosomes//2)
Array where the (i, j)th element is the number of variant sites with a
minor allele count of i in the first population and j in the second
population.
"""
# check inputs
ac1 = asarray_ndim(ac1, 2)
ac2 = asarray_ndim(ac2, 2)
assert ac1.shape[1] == ac2.shape[1] == 2, \
'only biallelic variants are supported'
# compute minor allele counts
mac1 = np.amin(ac1, axis=1)
mac2 = np.amin(ac2, axis=1)
# compute site frequency spectrum
m = np.max(mac1) + 1
n = np.max(mac2) + 1
s = np.bincount(mac1 * n + mac2)
s.resize((m, n))
return s
[docs]def joint_sfs_scaled(dac1, dac2):
"""Compute the joint site frequency spectrum between two populations,
scaled such that a constant value is expected across the spectrum for
neutral variation, constant population size and unrelated populations.
Parameters
----------
dac1 : array_like, int, shape (n_variants,)
Derived allele counts for the first population.
dac2 : array_like, int, shape (n_variants,)
Derived allele counts for the second population.
Returns
-------
joint_sfs_scaled : ndarray, int, shape (m_chromosomes, n_chromosomes)
Array where the (i, j)th element is the scaled frequency of variant
sites with i derived alleles in the first population and j derived
alleles in the second population.
"""
# compute site frequency spectrum
s = joint_sfs(dac1, dac2)
# apply scaling
s = scale_joint_sfs(s)
return s
[docs]def scale_joint_sfs(s):
"""Scale a joint site frequency spectrum.
Parameters
----------
s : array_like, int, shape (m_chromosomes, n_chromosomes)
Joint site frequency spectrum.
Returns
-------
joint_sfs_scaled : ndarray, int, shape (m_chromosomes, n_chromosomes)
Scaled joint site frequency spectrum.
"""
i = np.arange(s.shape[0])[:, None]
j = np.arange(s.shape[1])[None, :]
out = (s * i) * j
return out
[docs]def joint_sfs_folded_scaled(ac1, ac2, m=None, n=None):
"""Compute the joint folded site frequency spectrum between two
populations, scaled such that a constant value is expected across the
spectrum for neutral variation, constant population size and unrelated
populations.
Parameters
----------
ac1 : array_like, int, shape (n_variants, 2)
Allele counts for the first population.
ac2 : array_like, int, shape (n_variants, 2)
Allele counts for the second population.
m : int, optional
Number of chromosomes called in the first population.
n : int, optional
Number of chromosomes called in the second population.
Returns
-------
joint_sfs_folded_scaled : ndarray, int, shape (m_chromosomes//2, n_chromosomes//2)
Array where the (i, j)th element is the scaled frequency of variant
sites with a minor allele count of i in the first population and j
in the second population.
""" # noqa
# compute site frequency spectrum
s = joint_sfs_folded(ac1, ac2)
# determine the total number of chromosomes called
if m is None:
m = np.amax(np.sum(ac1, axis=1))
if n is None:
n = np.amax(np.sum(ac2, axis=1))
# apply scaling
s = scale_joint_sfs_folded(s, m, n)
return s
[docs]def scale_joint_sfs_folded(s, m, n):
"""Scale a folded joint site frequency spectrum.
Parameters
----------
s : array_like, int, shape (m_chromosomes//2, n_chromosomes//2)
Folded joint site frequency spectrum.
m : int
Number of chromosomes called in the first population.
n : int
Number of chromosomes called in the second population.
Returns
-------
joint_sfs_folded_scaled : ndarray, int, shape (m_chromosomes//2, n_chromosomes//2)
Scaled folded joint site frequency spectrum.
""" # noqa
out = np.empty_like(s)
for i in range(s.shape[0]):
for j in range(s.shape[1]):
out[i, j] = s[i, j] * i * j * (m-i) * (n-j)
return out
[docs]def fold_sfs(s, n):
"""Fold a site frequency spectrum.
Parameters
----------
s : array_like, int, shape (n_chromosomes,)
Site frequency spectrum
n : int
Total number of chromosomes called.
Returns
-------
sfs_folded : ndarray, int
Folded site frequency spectrum
"""
# check inputs
s = asarray_ndim(s, 1)
assert s.shape[0] <= n + 1, 'invalid number of chromosomes'
# need to check s has all entries up to n
if s.shape[0] < n + 1:
sn = np.zeros(n + 1, dtype=s.dtype)
sn[:s.shape[0]] = s
s = sn
# fold
nf = (n + 1) // 2
n = nf * 2
o = s[:nf] + s[nf:n][::-1]
return o
[docs]def fold_joint_sfs(s, m, n):
"""Fold a joint site frequency spectrum.
Parameters
----------
s : array_like, int, shape (m_chromosomes, n_chromosomes)
Joint site frequency spectrum.
m : int
Number of chromosomes called in the first population.
n : int
Number of chromosomes called in the second population.
Returns
-------
joint_sfs_folded : ndarray, int
Folded joint site frequency spectrum.
"""
# check inputs
s = asarray_ndim(s, 2)
assert s.shape[0] <= m + 1, 'invalid number of chromosomes'
assert s.shape[1] <= n + 1, 'invalid number of chromosomes'
# need to check s has all entries up to m
if s.shape[0] < m + 1:
sm = np.zeros((m + 1, s.shape[1]), dtype=s.dtype)
sm[:s.shape[0]] = s
s = sm
# need to check s has all entries up to n
if s.shape[1] < n + 1:
sn = np.zeros((s.shape[0], n + 1), dtype=s.dtype)
sn[:, :s.shape[1]] = s
s = sn
# fold
mf = (m + 1) // 2
nf = (n + 1) // 2
m = mf * 2
n = nf * 2
o = (s[:mf, :nf] + # top left
s[mf:m, :nf][::-1] + # top right
s[:mf, nf:n][:, ::-1] + # bottom left
s[mf:m, nf:n][::-1, ::-1]) # bottom right
return o
[docs]def plot_sfs(s, yscale='log', bins=None, n=None,
clip_endpoints=True, label=None, plot_kwargs=None,
ax=None):
import matplotlib.pyplot as plt
import scipy
# check inputs
s = asarray_ndim(s, 1)
# setup axes
if ax is None:
fig, ax = plt.subplots()
# setup data
if bins is None:
if clip_endpoints:
x = np.arange(1, s.shape[0]-1)
y = s[1:-1]
else:
x = np.arange(s.shape[0])
y = s
else:
if clip_endpoints:
y, b, _ = scipy.stats.binned_statistic(
np.arange(1, s.shape[0]-1),
values=s[1:-1],
bins=bins,
statistic='sum')
else:
y, b, _ = scipy.stats.binned_statistic(
np.arange(s.shape[0]),
values=s,
bins=bins,
statistic='sum')
# use bin midpoints for plotting
x = (b[:-1] + b[1:]) / 2
if n:
# convert allele counts to allele frequencies
x = x / n
ax.set_xlabel('derived allele frequency')
else:
ax.set_xlabel('derived allele count')
# do plotting
if plot_kwargs is None:
plot_kwargs = dict()
ax.plot(x, y, label=label, **plot_kwargs)
# tidy
ax.set_yscale(yscale)
ax.set_ylabel('site frequency')
ax.autoscale(axis='x', tight=True)
return ax
[docs]def plot_sfs_folded(*args, **kwargs):
ax = plot_sfs(*args, **kwargs)
m = kwargs.get('m', None)
if m:
ax.set_xlabel('minor allele frequency')
else:
ax.set_xlabel('minor allele count')
return ax
[docs]def plot_sfs_scaled(*args, **kwargs):
kwargs.setdefault('yscale', 'linear')
ax = plot_sfs(*args, **kwargs)
ax.set_ylabel('scaled site frequency')
return ax
[docs]def plot_sfs_folded_scaled(*args, **kwargs):
kwargs.setdefault('yscale', 'linear')
ax = plot_sfs_folded(*args, **kwargs)
ax.set_ylabel('scaled site frequency')
return ax
[docs]def plot_joint_sfs(s, ax=None, imshow_kwargs=None):
import matplotlib.pyplot as plt
import matplotlib as mpl
# check inputs
s = asarray_ndim(s, 2)
# setup axes
if ax is None:
w = plt.rcParams['figure.figsize'][0]
fig, ax = plt.subplots(figsize=(w, w))
# set plotting defaults
if imshow_kwargs is None:
imshow_kwargs = dict()
imshow_kwargs.setdefault('cmap', 'jet')
imshow_kwargs.setdefault('interpolation', 'none')
imshow_kwargs.setdefault('aspect', 'auto')
imshow_kwargs.setdefault('norm', mpl.colors.LogNorm())
# plot data
ax.imshow(s, **imshow_kwargs)
# tidy
ax.xaxis.tick_top()
ax.set_ylabel('derived allele count (population 1)')
ax.set_xlabel('derived allele count (population 2)')
ax.xaxis.set_label_position('top')
return ax
[docs]def plot_joint_sfs_folded(*args, **kwargs):
ax = plot_joint_sfs(*args, **kwargs)
ax.set_ylabel('minor allele count (population 1)')
ax.set_xlabel('minor allele count (population 2)')
return ax
[docs]def plot_joint_sfs_scaled(*args, **kwargs):
imshow_kwargs = kwargs.get('imshow_kwargs', dict())
imshow_kwargs.setdefault('norm', None)
kwargs['imshow_kwargs'] = imshow_kwargs
ax = plot_joint_sfs(*args, **kwargs)
return ax
[docs]def plot_joint_sfs_folded_scaled(*args, **kwargs):
imshow_kwargs = kwargs.get('imshow_kwargs', dict())
imshow_kwargs.setdefault('norm', None)
kwargs['imshow_kwargs'] = imshow_kwargs
ax = plot_joint_sfs_folded(*args, **kwargs)
return ax