# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
import csv
from datetime import date
import itertools
from operator import itemgetter
import subprocess
import gzip
import logging
from allel.compat import zip_longest, PY2, text_type, range, unquote_plus
import numpy as np
import allel
from allel.util import asarray_ndim
logger = logging.getLogger(__name__)
debug = logger.debug
VCF_FIXED_FIELDS = 'CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO'
[docs]def write_vcf(path, variants, rename=None, number=None, description=None,
fill=None, write_header=True):
with open(path, 'w') as vcf_file:
if write_header:
write_vcf_header(vcf_file, variants=variants, rename=rename,
number=number, description=description)
write_vcf_data(vcf_file, variants=variants, rename=rename, fill=fill)
def write_vcf_header(vcf_file, variants, rename, number, description):
if rename is None:
rename = dict()
if number is None:
number = dict()
if description is None:
description = dict()
# write file format version
print('##fileformat=VCFv4.1', file=vcf_file)
# write today's date
today = date.today().strftime('%Y%m%d')
print('##fileDate=%s' % today, file=vcf_file)
# write source
print('##source=scikit-allel-%s' % allel.__version__, file=vcf_file)
names = variants.dtype.names
info_names = [n for n in names
if not n.upper().startswith('FILTER_') and
not n.upper() in VCF_FIXED_FIELDS]
info_ids = [rename[n] if n in rename else n
for n in info_names]
# write INFO headers, sorted by ID
for name, vcf_id in sorted(zip(info_names, info_ids), key=itemgetter(1)):
col = variants[name]
# determine VCF Number
if name in number:
vcf_number = number[name]
else:
if col.ndim == 1 and col.dtype.kind == 'b':
# Flag
vcf_number = 0
elif col.ndim == 1:
vcf_number = 1
elif col.ndim == 2:
vcf_number = col.shape[1]
else:
raise NotImplementedError('only columns with 1 or two '
'dimensions are supported')
# determine VCF Type
kind = col.dtype.kind
if kind == 'b':
vcf_type = 'Flag'
elif kind in 'ui':
vcf_type = 'Integer'
elif kind == 'f':
vcf_type = 'Float'
else:
vcf_type = 'String'
# determine VCF Description
if name in description:
vcf_description = description[name]
else:
vcf_description = ''
# construct INFO header line
header_line = '##INFO=<ID=%s,Number=%s,Type=%s,Description="%s">'\
% (vcf_id, vcf_number, vcf_type, vcf_description)
print(header_line, file=vcf_file)
filter_names = [n for n in names if n.upper().startswith('FILTER_')]
filter_ids = [rename[n] if n in rename else n[7:]
for n in filter_names]
# write FILTER headers, sorted by ID
for name, vcf_id in sorted(zip(filter_names, filter_ids),
key=itemgetter(1)):
# determine VCF Description
if name in description:
vcf_description = description[name]
else:
vcf_description = ''
# construct FILTER header line
header_line = '##FILTER=<ID=%s,Description="%s">'\
% (vcf_id, vcf_description)
print(header_line, file=vcf_file)
# write column names
line = '#' + '\t'.join(VCF_FIXED_FIELDS)
print(line, file=vcf_file)
# noinspection PyShadowingBuiltins
def write_vcf_data(vcf_file, variants, rename, fill):
if rename is None:
rename = dict()
if fill is None:
fill = dict()
# find the fixed columns, allowing for case insensitive naming in the
# input array
col_chrom = None
col_pos = None
col_id = None
col_ref = None
col_alt = None
col_qual = None
for n in variants.dtype.names:
if n.upper() == 'CHROM':
col_chrom = variants[n]
elif n.upper() == 'POS':
col_pos = variants[n]
elif n.upper() == 'ID':
col_id = variants[n]
elif n.upper() == 'REF':
col_ref = variants[n]
elif n.upper() == 'ALT':
col_alt = variants[n]
elif n.upper() == 'QUAL':
col_qual = variants[n]
# check for required columns
if col_chrom is None:
raise ValueError('CHROM column not found')
if col_pos is None:
raise ValueError('POS column not found')
# pad optional columns
dot = itertools.repeat('.')
if col_id is None:
col_id = dot
if col_ref is None:
col_ref = dot
if col_alt is None:
col_alt = dot
if col_qual is None:
col_qual = dot
# find FILTER columns
filter_names = [n for n in variants.dtype.names
if n.upper().startswith('FILTER_')]
filter_ids = [rename[n] if n in rename else n[7:]
for n in filter_names]
filter_cols = [variants[n] for n in filter_names]
# sort by ID
if filter_names:
filters = sorted(zip(filter_names, filter_ids, filter_cols),
key=itemgetter(1))
filter_names, filter_ids, filter_cols = zip(*filters)
# find INFO columns
info_names = [n for n in variants.dtype.names
if not n.upper().startswith('FILTER_') and
not n.upper() in VCF_FIXED_FIELDS]
info_ids = [rename[n] if n in rename else n
for n in info_names]
info_cols = [variants[n] for n in info_names]
# sort by ID
if info_names:
infos = sorted(zip(info_names, info_ids, info_cols),
key=itemgetter(1))
info_names, info_ids, info_cols = zip(*infos)
# setup writer
writer = csv.writer(vcf_file, delimiter='\t', lineterminator='\n')
# zip up data as rows
rows = zip(col_chrom, col_pos, col_id, col_ref, col_alt, col_qual)
filter_rows = zip(*filter_cols)
info_rows = zip(*info_cols)
for row, filter_row, info_row in zip_longest(rows, filter_rows, info_rows):
# unpack main row
chrom, pos, id, ref, alt, qual = row
chrom = _vcf_value_str(chrom)
pos = _vcf_value_str(pos)
id = _vcf_value_str(id)
ref = _vcf_value_str(ref)
alt = _vcf_value_str(alt, fill=fill.get('ALT', None))
qual = _vcf_value_str(qual)
# construct FILTER value
if filter_row is not None:
flt = [i for i, v in zip(filter_ids, filter_row) if v]
if flt:
flt = ';'.join(flt)
else:
flt = 'PASS'
else:
flt = '.'
# construct INFO value
if info_row is not None:
info_vals = [_vcf_info_str(n, i, v, fill)
for n, i, v in zip(info_names, info_ids, info_row)]
info_vals = [x for x in info_vals if x is not None]
info = ';'.join(info_vals)
else:
info = '.'
# repack
row = chrom, pos, id, ref, alt, qual, flt, info
writer.writerow(row)
def _vcf_value_str(o, fill=None):
if isinstance(o, bytes) and not PY2:
return str(o, encoding='ascii')
elif isinstance(o, (tuple, list, np.ndarray)):
if fill is None:
t = [_vcf_value_str(x) for x in o]
else:
t = [_vcf_value_str(x) for x in o if x != fill]
return ','.join(t)
else:
return str(o)
# noinspection PyShadowingBuiltins
def _vcf_info_str(name, id, value, fill):
if isinstance(value, (bool, np.bool_)):
if bool(value):
return id
else:
return None
else:
return '%s=%s' % (id, _vcf_value_str(value, fill=fill.get(name, None)))
[docs]def write_fasta(path, sequences, names, mode='w', width=80):
"""Write nucleotide sequences stored as numpy arrays to a FASTA file.
Parameters
----------
path : string
File path.
sequences : sequence of arrays
One or more ndarrays of dtype 'S1' containing the sequences.
names : sequence of strings
Names of the sequences.
mode : string, optional
Use 'a' to append to an existing file.
width : int, optional
Maximum line width.
"""
# check inputs
if isinstance(sequences, np.ndarray):
# single sequence
sequences = [sequences]
names = [names]
if len(sequences) != len(names):
raise ValueError('must provide the same number of sequences and names')
for sequence in sequences:
if sequence.dtype != np.dtype('S1'):
raise ValueError('expected S1 dtype, found %r' % sequence.dtype)
# force binary mode
mode = 'ab' if 'a' in mode else 'wb'
# write to file
with open(path, mode=mode) as fasta:
for name, sequence in zip(names, sequences):
# force bytes
if isinstance(name, text_type):
name = name.encode('ascii')
header = b'>' + name + b'\n'
fasta.write(header)
for i in range(0, sequence.size, width):
line = sequence[i:i+width].tostring() + b'\n'
fasta.write(line)
def gff3_parse_attributes(attributes_string):
"""Parse a string of GFF3 attributes ('key=value' pairs delimited by ';')
and return a dictionary.
"""
attributes = dict()
if not PY2:
# convert to ascii string to enable conversion of escaped characters
attributes_string = str(attributes_string, encoding='ascii')
fields = attributes_string.split(';')
for f in fields:
if '=' in f:
key, value = f.split('=')
key = unquote_plus(key).strip()
value = unquote_plus(value.strip())
if not PY2:
# convert back to bytes
value = value.encode('ascii')
attributes[key] = value
elif len(f) > 0:
# not strictly kosher
attributes[unquote_plus(f).strip()] = True
return attributes
[docs]def iter_gff3(path, attributes=None, region=None, score_fill=-1,
phase_fill=-1, attributes_fill=b'.'):
# prepare fill values for attributes
if attributes is not None:
if isinstance(attributes_fill, (list, tuple)):
if len(attributes != len(attributes_fill)):
raise ValueError('number of fills does not match attributes')
else:
attributes_fill = [attributes_fill] * len(attributes)
# open input stream
needs_closing = False
if region is not None:
cmd = ['tabix', path, region]
buffer = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout
elif path.endswith('.gz') or path.endswith('.bgz'):
buffer = gzip.open(path, mode='rb')
needs_closing = True
else:
buffer = open(path, mode='rb')
needs_closing = True
debug(buffer)
try:
for line in buffer:
if line[0] == b'>':
# assume begin embedded FASTA
raise StopIteration
if line[0] == b'#':
# skip comment lines
continue
vals = line.split(b'\t')
if len(vals) == 9:
# unpack for processing
fseqid, fsource, ftype, fstart, fend, fscore, fstrand, \
fphase, fattrs = vals
# convert numerics
fstart = int(fstart)
fend = int(fend)
if fscore == b'.':
fscore = score_fill
else:
fscore = float(fscore)
if fphase == b'.':
fphase = phase_fill
else:
fphase = int(fphase)
rec = (fseqid, fsource, ftype, fstart, fend, fscore,
fstrand, fphase)
if attributes is not None:
dattrs = gff3_parse_attributes(fattrs)
vattrs = tuple(
dattrs.get(k, f)
for k, f in zip(attributes, attributes_fill)
)
rec += vattrs
yield rec
finally:
if needs_closing:
buffer.close()
def array_to_hdf5(a, parent, name, **kwargs):
"""Write a Numpy array to an HDF5 dataset.
Parameters
----------
a : ndarray
Data to write.
parent : string or h5py group
Parent HDF5 file or group. If a string, will be treated as HDF5 file
name.
name : string
Name or path of dataset to write data into.
kwargs : keyword arguments
Passed through to h5py require_dataset() function.
Returns
-------
h5d : h5py dataset
"""
import h5py
h5f = None
if isinstance(parent, str):
h5f = h5py.File(parent, mode='a')
parent = h5f
try:
kwargs.setdefault('chunks', True) # auto-chunking
kwargs.setdefault('dtype', a.dtype)
kwargs.setdefault('compression', 'gzip')
h5d = parent.require_dataset(name, shape=a.shape, **kwargs)
h5d[...] = a
return h5d
finally:
if h5f is not None:
h5f.close()
# noinspection PyIncorrectDocstring
def recarray_from_hdf5_group(*args, **kwargs):
"""Load a recarray from columns stored as separate datasets with an
HDF5 group.
Either provide an h5py group as a single positional argument,
or provide two positional arguments giving the HDF5 file path and the
group node path within the file.
The following optional parameters may be given.
Parameters
----------
start : int, optional
Index to start loading from.
stop : int, optional
Index to finish loading at.
condition : array_like, bool, optional
A 1-dimensional boolean array of the same length as the columns of the
table to load, indicating a selection of rows to load.
"""
import h5py
h5f = None
if len(args) == 1:
group = args[0]
elif len(args) == 2:
file_path, node_path = args
h5f = h5py.File(file_path, mode='r')
try:
group = h5f[node_path]
except:
h5f.close()
raise
else:
raise ValueError('bad arguments; expected group or (file_path, '
'node_path), found %s' % repr(args))
try:
if not isinstance(group, h5py.Group):
raise ValueError('expected group, found %r' % group)
# determine dataset names to load
available_dataset_names = [n for n in group.keys()
if isinstance(group[n], h5py.Dataset)]
names = kwargs.pop('names', available_dataset_names)
names = [str(n) for n in names] # needed for PY2
for n in names:
if n not in set(group.keys()):
raise ValueError('name not found: %s' % n)
if not isinstance(group[n], h5py.Dataset):
raise ValueError('name does not refer to a dataset: %s, %r'
% (n, group[n]))
# check datasets are aligned
datasets = [group[n] for n in names]
length = datasets[0].shape[0]
for d in datasets[1:]:
if d.shape[0] != length:
raise ValueError('datasets must be of equal length')
# determine start and stop parameters for load
start = kwargs.pop('start', 0)
stop = kwargs.pop('stop', length)
# check condition
condition = kwargs.pop('condition', None)
condition = asarray_ndim(condition, 1, allow_none=True)
if condition is not None and condition.size != length:
raise ValueError('length of condition does not match length '
'of datasets')
# setup output data
dtype = [(n, d.dtype, d.shape[1:]) for n, d in zip(names, datasets)]
ra = np.empty(length, dtype=dtype)
for n, d in zip(names, datasets):
a = d[start:stop]
if condition is not None:
a = np.compress(condition[start:stop], a, axis=0)
ra[n] = a
return ra
finally:
if h5f is not None:
h5f.close()
def recarray_to_hdf5_group(ra, parent, name, **kwargs):
"""Write each column in a recarray to a dataset in an HDF5 group.
Parameters
----------
ra : recarray
Numpy recarray to store.
parent : string or h5py group
Parent HDF5 file or group. If a string, will be treated as HDF5 file
name.
name : string
Name or path of group to write data into.
kwargs : keyword arguments
Passed through to h5py require_dataset() function.
Returns
-------
h5g : h5py group
"""
import h5py
h5f = None
if isinstance(parent, str):
h5f = h5py.File(parent, mode='a')
parent = h5f
try:
h5g = parent.require_group(name)
for n in ra.dtype.names:
array_to_hdf5(ra[n], h5g, n, **kwargs)
return h5g
finally:
if h5f is not None:
h5f.close()