Source code for allel.io.vcf_write

# -*- coding: utf-8 -*-
import csv
from datetime import date
import itertools
from operator import itemgetter
import logging


import numpy as np


import allel


logger = logging.getLogger(__name__)
debug = logger.debug


VCF_FIXED_FIELDS = 'CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO'


def normalize_callset(callset):

    if hasattr(callset, 'keys'):
        names = list()
        new_callset = dict()
        for k in list(callset.keys()):
            a = callset[k]
            if k.startswith('calldata/'):
                continue
            if k == 'samples':
                continue
            if k.startswith('variants/'):
                k = k[9:]
            names.append(k)
            new_callset[k] = a
        callset = new_callset
    elif hasattr(callset, 'dtype') and callset.dtype.names:
        names = list(callset.dtype.names)
    else:
        raise ValueError('callset should be dict or recarray, found %r' % callset)

    return names, callset


[docs]def write_vcf(path, callset, rename=None, number=None, description=None, fill=None, write_header=True): """Preliminary support for writing a VCF file. Currently does not support sample data. Needs further work.""" names, callset = normalize_callset(callset) with open(path, 'w') as vcf_file: if write_header: write_vcf_header(vcf_file, names, callset=callset, rename=rename, number=number, description=description) write_vcf_data(vcf_file, names, callset=callset, rename=rename, fill=fill)
def write_vcf_header(vcf_file, names, callset, rename, number, description): if rename is None: rename = dict() if number is None: number = dict() if description is None: description = dict() # write file format version print('##fileformat=VCFv4.1', file=vcf_file) # write today's date today = date.today().strftime('%Y%m%d') print('##fileDate=%s' % today, file=vcf_file) # write source print('##source=scikit-allel-%s' % allel.__version__, file=vcf_file) info_names = [n for n in names if not n.upper().startswith('FILTER_') and not n.upper() in VCF_FIXED_FIELDS] info_ids = [rename[n] if n in rename else n for n in info_names] # write INFO headers, sorted by ID for name, vcf_id in sorted(zip(info_names, info_ids), key=itemgetter(1)): col = callset[name] # determine VCF Number if name in number: vcf_number = number[name] else: if col.ndim == 1 and col.dtype.kind == 'b': # Flag vcf_number = 0 elif col.ndim == 1: vcf_number = 1 elif col.ndim == 2: vcf_number = col.shape[1] else: raise NotImplementedError('only columns with 1 or two ' 'dimensions are supported') # determine VCF Type kind = col.dtype.kind if kind == 'b': vcf_type = 'Flag' elif kind in 'ui': vcf_type = 'Integer' elif kind == 'f': vcf_type = 'Float' else: vcf_type = 'String' # determine VCF Description if name in description: vcf_description = description[name] else: vcf_description = '' # construct INFO header line header_line = '##INFO=<ID=%s,Number=%s,Type=%s,Description="%s">'\ % (vcf_id, vcf_number, vcf_type, vcf_description) print(header_line, file=vcf_file) filter_names = [n for n in names if n.upper().startswith('FILTER_')] filter_ids = [rename[n] if n in rename else n[7:] for n in filter_names] # write FILTER headers, sorted by ID for name, vcf_id in sorted(zip(filter_names, filter_ids), key=itemgetter(1)): # determine VCF Description if name in description: vcf_description = description[name] else: vcf_description = '' # construct FILTER header line header_line = '##FILTER=<ID=%s,Description="%s">'\ % (vcf_id, vcf_description) print(header_line, file=vcf_file) # write column names line = '#' + '\t'.join(VCF_FIXED_FIELDS) print(line, file=vcf_file) # noinspection PyShadowingBuiltins def write_vcf_data(vcf_file, names, callset, rename, fill): if rename is None: rename = dict() if fill is None: fill = dict() # find the fixed columns, allowing for case insensitive naming in the # input array col_chrom = None col_pos = None col_id = None col_ref = None col_alt = None col_qual = None for n in names: if n.upper() == 'CHROM': col_chrom = callset[n] elif n.upper() == 'POS': col_pos = callset[n] elif n.upper() == 'ID': col_id = callset[n] elif n.upper() == 'REF': col_ref = callset[n] elif n.upper() == 'ALT': col_alt = callset[n] elif n.upper() == 'QUAL': col_qual = callset[n] # check for required columns if col_chrom is None: raise ValueError('CHROM column not found') if col_pos is None: raise ValueError('POS column not found') # pad optional columns dot = itertools.repeat('.') if col_id is None: col_id = dot if col_ref is None: col_ref = dot if col_alt is None: col_alt = dot if col_qual is None: col_qual = dot # find FILTER columns filter_names = [n for n in names if n.upper().startswith('FILTER_')] filter_ids = [rename[n] if n in rename else n[7:] for n in filter_names] filter_cols = [callset[n] for n in filter_names] # sort by ID if filter_names: filters = sorted(zip(filter_names, filter_ids, filter_cols), key=itemgetter(1)) filter_names, filter_ids, filter_cols = zip(*filters) # find INFO columns info_names = [n for n in names if not n.upper().startswith('FILTER_') and not n.upper() in VCF_FIXED_FIELDS] info_ids = [rename[n] if n in rename else n for n in info_names] info_cols = [callset[n] for n in info_names] # sort by ID if info_names: infos = sorted(zip(info_names, info_ids, info_cols), key=itemgetter(1)) info_names, info_ids, info_cols = zip(*infos) # setup writer writer = csv.writer(vcf_file, delimiter='\t', lineterminator='\n') # zip up data as rows rows = zip(col_chrom, col_pos, col_id, col_ref, col_alt, col_qual) filter_rows = zip(*filter_cols) info_rows = zip(*info_cols) for row, filter_row, info_row in itertools.zip_longest(rows, filter_rows, info_rows): # unpack main row chrom, pos, id, ref, alt, qual = row chrom = _vcf_value_str(chrom) pos = _vcf_value_str(pos) id = _vcf_value_str(id) ref = _vcf_value_str(ref) alt = _vcf_value_str(alt, fill=fill.get('ALT', None)) qual = _vcf_value_str(qual) # construct FILTER value if filter_row is not None: flt = [i for i, v in zip(filter_ids, filter_row) if v] if flt: flt = ';'.join(flt) else: flt = 'PASS' else: flt = '.' # construct INFO value if info_row is not None: info_vals = [_vcf_info_str(n, i, v, fill) for n, i, v in zip(info_names, info_ids, info_row)] info_vals = [x for x in info_vals if x is not None] info = ';'.join(info_vals) else: info = '.' # repack row = chrom, pos, id, ref, alt, qual, flt, info writer.writerow(row) def _vcf_value_str(o, fill=None): if isinstance(o, bytes): return str(o, encoding='ascii') elif isinstance(o, (tuple, list, np.ndarray)): if fill is None: t = [_vcf_value_str(x) for x in o] else: t = [_vcf_value_str(x) for x in o if x != fill] return ','.join(t) else: return str(o) # noinspection PyShadowingBuiltins def _vcf_info_str(name, id, value, fill): if isinstance(value, (bool, np.bool_)): if bool(value): return id else: return None else: return '%s=%s' % (id, _vcf_value_str(value, fill=fill.get(name, None)))