Source code for allel.io.vcf_read

# -*- coding: utf-8 -*-
"""
Functions for extracting data from Variant Call Format (VCF) files and loading
into NumPy arrays, NumPy files, HDF5 files or Zarr array stores.

WONTFIX/POSTPONED:

* GenotypeArray.from_vcf
* HaplotypeArray.from_vcf(ploidy)
* VariantTable.from_vcf
* is_phased computed field
* Feature to rename fields, e.g., calldata/GT -> calldata/genotype. Could be implemented via
  transformer.
* Specialised parser for EFF - obsolete.

"""
from __future__ import absolute_import, print_function, division
import gzip
import os
import re
from collections import namedtuple
import warnings
import time
import subprocess


import numpy as np


from allel.compat import PY2, FileNotFoundError
from allel.opt.io_vcf_read import VCFChunkIterator, FileInputStream
# expose some names from cython extension
# noinspection PyUnresolvedReferences
from allel.opt.io_vcf_read import ANNTransformer, ANN_AA_LENGTH_FIELD, ANN_AA_POS_FIELD, \
    ANN_ALLELE_FIELD, ANN_ANNOTATION_FIELD, ANN_ANNOTATION_IMPACT_FIELD, ANN_CDNA_LENGTH_FIELD, \
    ANN_CDNA_POS_FIELD, ANN_CDS_LENGTH_FIELD, ANN_CDS_POS_FIELD, ANN_DISTANCE_FIELD, \
    ANN_FEATURE_ID_FIELD, ANN_FEATURE_TYPE_FIELD, ANN_FIELD, ANN_FIELDS, ANN_GENE_ID_FIELD, \
    ANN_GENE_NAME_FIELD, ANN_HGVS_C_FIELD, ANN_HGVS_P_FIELD, ANN_RANK_FIELD, \
    ANN_TRANSCRIPT_BIOTYPE_FIELD  # flake8: noqa


DEFAULT_BUFFER_SIZE = 2**14
DEFAULT_CHUNK_LENGTH = 2**16
DEFAULT_CHUNK_WIDTH = 2**6
DEFAULT_ALT_NUMBER = 3


def _prep_fields_param(fields):
    """Prepare the `fields` parameter, and determine whether or not to store samples."""

    store_samples = False

    if fields is None:
        # add samples by default
        return True, None

    if isinstance(fields, str):
        fields = [fields]
    else:
        fields = list(fields)

    if 'samples' in fields:
        fields.remove('samples')
        store_samples = True
    elif '*' in fields:
        store_samples = True

    return store_samples, fields


def _chunk_iter_progress(it, log, prefix):
    """Wrap a chunk iterator for progress logging."""
    n_variants = 0
    before_all = time.time()
    before_chunk = before_all
    for chunk, chunk_length, chrom, pos in it:
        after_chunk = time.time()
        elapsed_chunk = after_chunk - before_chunk
        elapsed = after_chunk - before_all
        n_variants += chunk_length
        if not PY2:
            chrom = str(chrom, 'ascii')
        print('%s %s rows in %.2fs; chunk in %.2fs (%s rows/s); %s:%s' %
              (prefix, n_variants, elapsed, elapsed_chunk,
               int(chunk_length//elapsed_chunk), chrom, pos),
              file=log)
        log.flush()
        yield chunk, chunk_length, chrom, pos
        before_chunk = after_chunk
    after_all = time.time()
    elapsed = after_all - before_all
    print('%s all done (%s rows/s)' %
          (prefix, int(n_variants//elapsed)), file=log)
    log.flush()


def _chunk_iter_transform(it, transformers):
    for chunk, chunk_length, chrom, pos in it:
        for transformer in transformers:
            transformer.transform_chunk(chunk)
        yield chunk, chunk_length, chrom, pos


_doc_param_input = \
    """Path to VCF file on the local file system. May be uncompressed or gzip-compatible
        compressed file. May also be a file-like object (e.g., `io.BytesIO`)."""

_doc_param_fields = \
    """Fields to extract data for. Should be a list of strings, e.g., ``['variants/CHROM',
        'variants/POS', 'variants/DP', 'calldata/GT']``. If you are feeling lazy, you can drop
        the 'variants/' and 'calldata/' prefixes, in which case the fields will be matched against
        fields declared in the VCF header, with variants taking priority over calldata if a field
        with the same ID exists both in INFO and FORMAT headers. I.e., ``['CHROM', 'POS', 'DP',
        'GT']`` will work, although watch out for fields like 'DP' which can be both
        INFO and FORMAT. For convenience, some special string values are also recognized. To
        extract all fields, provide just the string ``'*'``. To extract all variants fields
        (including all INFO fields) provide ``'variants/*'``. To extract all calldata fields (i.e.,
        defined in FORMAT headers) provide ``'calldata/*'``."""

_doc_param_types = \
    """Overide data types. Should be a dictionary mapping field names to NumPy data types.
        E.g., providing the dictionary ``{'variants/DP': 'i8', 'calldata/GQ': 'i2'}`` will mean
        the 'variants/DP' field is stored in a 64-bit integer array, and the 'calldata/GQ' field
        is stored in a 16-bit integer array."""

_doc_param_numbers = \
    """Override the expected number of values. Should be a dictionary mapping field names to
        integers. E.g., providing the dictionary ``{'variants/ALT': 5, 'variants/AC': 5,
        'calldata/HQ': 2}`` will mean that, for each variant, 5 values are stored for the
        'variants/ALT' field, 5 values are stored for the 'variants/AC' field, and for each
        sample, 2 values are stored for the 'calldata/HQ' field."""

_doc_param_alt_number = \
    """Assume this number of alternate alleles and set expected number of values accordingly for
        any field declared with number 'A' or 'R' in the VCF meta-information."""

_doc_param_fills = \
    """Override the fill value used for empty values. Should be a dictionary mapping field names
        to fill values."""

_doc_param_region = \
    """Genomic region to extract variants for. If provided, should be a tabix-style region string,
        which can be either just a chromosome name (e.g., '2L'), or a chromosome name followed by
        1-based beginning and end coordinates (e.g., '2L:100000-200000'). Note that only variants
        whose start position (POS) is within the requested range will be included. This is slightly
        different from the default tabix behaviour, where a variant (e.g., deletion) may be included
        if its position (POS) occurs before the requested region but its reference allele overlaps
        the region - such a variant will *not* be included in the data returned by this function."""

_doc_param_tabix = \
    """Name or path to tabix executable. Only required if `region` is given. Setting `tabix` to
        `None` will cause a fall-back to scanning through the VCF file from the beginning, which
        may be much slower than tabix but the only option if tabix is not available on your system
        and/or the VCF file has not been tabix-indexed."""

_doc_param_samples = \
    """Selection of samples to extract calldata for. If provided, should be a list of strings
        giving sample identifiers. May also be a list of integers giving indices of selected
        samples."""

_doc_param_transformers = \
    """Transformers for post-processing data. If provided, should be a list of Transformer
        objects, each of which must implement a "transform()" method that accepts a dict
        containing the chunk of data to be transformed. See also the :class:`ANNTransformer`
        class which implements post-processing of data from SNPEFF."""

_doc_param_buffer_size = \
    """Size in bytes of the I/O buffer used when reading data from the underlying file or tabix
        stream."""

_doc_param_chunk_length = \
    """Length (number of variants) of chunks in which data are processed."""

_doc_param_log = \
    """A file-like object (e.g., `sys.stderr`) to print progress information."""


[docs]def read_vcf(input, fields=None, types=None, numbers=None, alt_number=DEFAULT_ALT_NUMBER, fills=None, region=None, tabix='tabix', samples=None, transformers=None, buffer_size=DEFAULT_BUFFER_SIZE, chunk_length=DEFAULT_CHUNK_LENGTH, log=None): """Read data from a VCF file into NumPy arrays. Parameters ---------- input : string or file-like {input} fields : list of strings, optional {fields} types : dict, optional {types} numbers : dict, optional {numbers} alt_number : int, optional {alt_number} fills : dict, optional {fills} region : string, optional {region} tabix : string, optional {tabix} samples : list of strings {samples} transformers : list of transformer objects, optional {transformers} buffer_size : int, optional {buffer_size} chunk_length : int, optional {chunk_length} log : file-like, optional {log} Returns ------- data : dict[str, ndarray] A dictionary holding arrays. """ # samples requested? # noinspection PyTypeChecker store_samples, fields = _prep_fields_param(fields) # setup _, samples, _, it = iter_vcf_chunks( input=input, fields=fields, types=types, numbers=numbers, alt_number=alt_number, buffer_size=buffer_size, chunk_length=chunk_length, fills=fills, region=region, tabix=tabix, samples=samples, transformers=transformers ) # setup progress logging if log is not None: it = _chunk_iter_progress(it, log, prefix='[read_vcf]') # read all chunks into a list chunks = [chunk for chunk, _, _, _ in it] # setup output output = dict() if len(samples) > 0 and store_samples: output['samples'] = samples if chunks: # find array keys keys = sorted(chunks[0].keys()) # concatenate chunks for k in keys: output[k] = np.concatenate([chunk[k] for chunk in chunks], axis=0) return output
read_vcf.__doc__ = read_vcf.__doc__.format( input=_doc_param_input, fields=_doc_param_fields, types=_doc_param_types, numbers=_doc_param_numbers, alt_number=_doc_param_alt_number, fills=_doc_param_fills, region=_doc_param_region, tabix=_doc_param_tabix, samples=_doc_param_samples, transformers=_doc_param_transformers, buffer_size=_doc_param_buffer_size, chunk_length=_doc_param_chunk_length, log=_doc_param_log, ) _doc_param_output = \ """File-system path to write output to.""" _doc_param_overwrite = \ """If False (default), do not overwrite an existing file."""
[docs]def vcf_to_npz(input, output, compressed=True, overwrite=False, fields=None, types=None, numbers=None, alt_number=DEFAULT_ALT_NUMBER, fills=None, region=None, tabix=True, samples=None, transformers=None, buffer_size=DEFAULT_BUFFER_SIZE, chunk_length=DEFAULT_CHUNK_LENGTH, log=None): """Read data from a VCF file into NumPy arrays and save as a .npz file. Parameters ---------- input : string {input} output : string {output} compressed : bool, optional If True (default), save with compression. overwrite : bool, optional {overwrite} fields : list of strings, optional {fields} types : dict, optional {types} numbers : dict, optional {numbers} alt_number : int, optional {alt_number} fills : dict, optional {fills} region : string, optional {region} tabix : string, optional {tabix} samples : list of strings {samples} transformers : list of transformer objects, optional {transformers} buffer_size : int, optional {buffer_size} chunk_length : int, optional {chunk_length} log : file-like, optional {log} """ # guard condition if not overwrite and os.path.exists(output): raise ValueError('file exists at path %r; use overwrite=True to replace' % output) # read all data into memory data = read_vcf( input=input, fields=fields, types=types, numbers=numbers, alt_number=alt_number, buffer_size=buffer_size, chunk_length=chunk_length, log=log, fills=fills, region=region, tabix=tabix, samples=samples, transformers=transformers ) # setup save function if compressed: savez = np.savez_compressed else: savez = np.savez # save as npz savez(output, **data)
vcf_to_npz.__doc__ = vcf_to_npz.__doc__.format( input=_doc_param_input, output=_doc_param_output, overwrite=_doc_param_overwrite, fields=_doc_param_fields, types=_doc_param_types, numbers=_doc_param_numbers, alt_number=_doc_param_alt_number, fills=_doc_param_fills, region=_doc_param_region, tabix=_doc_param_tabix, samples=_doc_param_samples, transformers=_doc_param_transformers, buffer_size=_doc_param_buffer_size, chunk_length=_doc_param_chunk_length, log=_doc_param_log, ) def _hdf5_setup_datasets(chunk, root, chunk_length, chunk_width, compression, compression_opts, shuffle, overwrite, headers): import h5py # handle no input if chunk is None: raise RuntimeError('input file has no data?') # setup datasets keys = sorted(chunk.keys()) for k in keys: # obtain initial data data = chunk[k] # determine chunk shape if data.ndim == 1: chunk_shape = (chunk_length,) else: chunk_shape = (chunk_length, min(chunk_width, data.shape[1])) + data.shape[2:] # create dataset group, name = k.split('/') if name in root[group]: if overwrite: del root[group][name] else: raise ValueError('dataset exists at path %r; use overwrite=True to replace' % k) shape = (0,) + data.shape[1:] maxshape = (None,) + data.shape[1:] if data.dtype.kind == 'O': dt = h5py.special_dtype(vlen=str) else: dt = data.dtype ds = root[group].create_dataset( name, shape=shape, maxshape=maxshape, chunks=chunk_shape, dtype=dt, compression=compression, compression_opts=compression_opts, shuffle=shuffle ) # copy metadata from VCF headers meta = None if group == 'variants' and name in headers.infos: meta = headers.infos[name] elif group == 'calldata' and name in headers.formats: meta = headers.formats[name] if meta is not None: ds.attrs['ID'] = meta['ID'] ds.attrs['Number'] = meta['Number'] ds.attrs['Type'] = meta['Type'] ds.attrs['Description'] = meta['Description'] return keys def _hdf5_store_chunk(root, keys, chunk): # compute length of current chunk current_chunk_length = chunk[keys[0]].shape[0] # find current length of datasets old_length = root[keys[0]].shape[0] # new length of all arrays after loading this chunk new_length = old_length + current_chunk_length # load arrays for k in keys: # data to be loaded data = chunk[k] # obtain dataset dataset = root[k] # ensure dataset is large enough dataset.resize(new_length, axis=0) # store the data dataset[old_length:new_length, ...] = data _doc_param_chunk_width = \ """Width (number of samples) to use when storing chunks in output."""
[docs]def vcf_to_hdf5(input, output, group='/', compression='gzip', compression_opts=1, shuffle=False, overwrite=False, fields=None, types=None, numbers=None, alt_number=DEFAULT_ALT_NUMBER, fills=None, region=None, tabix='tabix', samples=None, transformers=None, buffer_size=DEFAULT_BUFFER_SIZE, chunk_length=DEFAULT_CHUNK_LENGTH, chunk_width=DEFAULT_CHUNK_WIDTH, log=None): """Read data from a VCF file and load into an HDF5 file. Parameters ---------- input : string {input} output : string {output} group : string Group within destination HDF5 file to store data in. compression : string Compression algorithm, e.g., 'gzip' (default). compression_opts : int Compression level, e.g., 1 (default). shuffle : bool Use byte shuffling, which may improve compression (default is False). overwrite : bool {overwrite} fields : list of strings, optional {fields} types : dict, optional {types} numbers : dict, optional {numbers} alt_number : int, optional {alt_number} fills : dict, optional {fills} region : string, optional {region} tabix : string, optional {tabix} samples : list of strings {samples} transformers : list of transformer objects, optional {transformers} buffer_size : int, optional {buffer_size} chunk_length : int, optional {chunk_length} chunk_width : int, optional {chunk_width} log : file-like, optional {log} """ import h5py # samples requested? # noinspection PyTypeChecker store_samples, fields = _prep_fields_param(fields) with h5py.File(output, mode='a') as h5f: # obtain root group that data will be stored into root = h5f.require_group(group) # ensure sub-groups root.require_group('variants') root.require_group('calldata') # setup chunk iterator _, samples, headers, it = iter_vcf_chunks( input, fields=fields, types=types, numbers=numbers, alt_number=alt_number, buffer_size=buffer_size, chunk_length=chunk_length, fills=fills, region=region, tabix=tabix, samples=samples, transformers=transformers ) # setup progress logging if log is not None: it = _chunk_iter_progress(it, log, prefix='[vcf_to_hdf5]') if len(samples) > 0 and store_samples: # store samples name = 'samples' if name in root[group]: if overwrite: del root[group][name] else: raise ValueError('dataset exists at path %r; use overwrite=True to replace' % name) if samples.dtype.kind == 'O': t = h5py.special_dtype(vlen=str) else: t = samples.dtype root[group].create_dataset(name, data=samples, chunks=None, dtype=t) # read first chunk chunk, _, _, _ = next(it) # setup datasets # noinspection PyTypeChecker keys = _hdf5_setup_datasets( chunk=chunk, root=root, chunk_length=chunk_length, chunk_width=chunk_width, compression=compression, compression_opts=compression_opts, shuffle=shuffle, overwrite=overwrite, headers=headers ) # store first chunk _hdf5_store_chunk(root, keys, chunk) # store remaining chunks for chunk, _, _, _ in it: _hdf5_store_chunk(root, keys, chunk)
vcf_to_hdf5.__doc__ = vcf_to_hdf5.__doc__.format( input=_doc_param_input, output=_doc_param_output, overwrite=_doc_param_overwrite, fields=_doc_param_fields, types=_doc_param_types, numbers=_doc_param_numbers, alt_number=_doc_param_alt_number, fills=_doc_param_fills, region=_doc_param_region, tabix=_doc_param_tabix, samples=_doc_param_samples, transformers=_doc_param_transformers, buffer_size=_doc_param_buffer_size, chunk_length=_doc_param_chunk_length, chunk_width=_doc_param_chunk_width, log=_doc_param_log, ) _zarr_string_codec = None # noinspection PyUnresolvedReferences def _get_zarr_string_codec(): global _zarr_string_codec if _zarr_string_codec is None: import zarr import numcodecs from distutils.version import StrictVersion # need to register codecs for string encoding for zarr < 2.2 need_to_register = StrictVersion(zarr.__version__) < StrictVersion('2.2') try: from numcodecs import MsgPack _zarr_string_codec = MsgPack() if need_to_register: zarr.codecs.codec_registry[numcodecs.MsgPack.codec_id] = MsgPack except ImportError: from numcodecs import Pickle _zarr_string_codec = Pickle() if need_to_register: zarr.codecs.codec_registry[numcodecs.Pickle.codec_id] = Pickle return _zarr_string_codec def _zarr_setup_datasets(chunk, root, chunk_length, chunk_width, compressor, overwrite, headers): # handle no input if chunk is None: raise RuntimeError('input file has no data?') # setup datasets keys = sorted(chunk.keys()) for k in keys: # obtain initial data data = chunk[k] # determine chunk shape if data.ndim == 1: chunk_shape = (chunk_length,) else: chunk_shape = (chunk_length, min(chunk_width, data.shape[1])) + data.shape[2:] # create dataset shape = (0,) + data.shape[1:] if data.dtype.kind == 'O': filters = [_get_zarr_string_codec()] else: filters = None ds = root.create_dataset(k, shape=shape, chunks=chunk_shape, dtype=data.dtype, compressor=compressor, overwrite=overwrite, filters=filters) # copy metadata from VCF headers group, name = k.split('/') meta = None if group == 'variants' and name in headers.infos: meta = headers.infos[name] elif group == 'calldata' and name in headers.formats: meta = headers.formats[name] if meta is not None: ds.attrs['ID'] = meta['ID'] ds.attrs['Number'] = meta['Number'] ds.attrs['Type'] = meta['Type'] ds.attrs['Description'] = meta['Description'] return keys def _zarr_store_chunk(root, keys, chunk): # load arrays for k in keys: # append data root[k].append(chunk[k], axis=0)
[docs]def vcf_to_zarr(input, output, group='/', compressor='default', overwrite=False, fields=None, types=None, numbers=None, alt_number=DEFAULT_ALT_NUMBER, fills=None, region=None, tabix='tabix', samples=None, transformers=None, buffer_size=DEFAULT_BUFFER_SIZE, chunk_length=DEFAULT_CHUNK_LENGTH, chunk_width=DEFAULT_CHUNK_WIDTH, log=None): """Read data from a VCF file and load into a Zarr on-disk store. Parameters ---------- input : string {input} output : string {output} group : string Group within destination Zarr hierarchy to store data in. compressor : compressor Compression algorithm, e.g., zarr.Blosc(cname='zstd', clevel=1, shuffle=1). overwrite : bool {overwrite} fields : list of strings, optional {fields} types : dict, optional {types} numbers : dict, optional {numbers} alt_number : int, optional {alt_number} fills : dict, optional {fills} region : string, optional {region} tabix : string, optional {tabix} samples : list of strings {samples} transformers : list of transformer objects, optional {transformers} buffer_size : int, optional {buffer_size} chunk_length : int, optional {chunk_length} chunk_width : int, optional {chunk_width} log : file-like, optional {log} """ import zarr # samples requested? # noinspection PyTypeChecker store_samples, fields = _prep_fields_param(fields) # open root group root = zarr.open_group(output, mode='a', path=group) # ensure sub-groups root.require_group('variants') root.require_group('calldata') # setup chunk iterator _, samples, headers, it = iter_vcf_chunks( input, fields=fields, types=types, numbers=numbers, alt_number=alt_number, buffer_size=buffer_size, chunk_length=chunk_length, fills=fills, region=region, tabix=tabix, samples=samples, transformers=transformers ) # setup progress logging if log is not None: it = _chunk_iter_progress(it, log, prefix='[vcf_to_zarr]') if len(samples) > 0 and store_samples: # store samples if samples.dtype.kind == 'O': filters = [_get_zarr_string_codec()] else: filters = None root[group].create_dataset('samples', data=samples, compressor=None, overwrite=overwrite, filters=filters) # read first chunk chunk, _, _, _ = next(it) # setup datasets # noinspection PyTypeChecker keys = _zarr_setup_datasets( chunk, root=root, chunk_length=chunk_length, chunk_width=chunk_width, compressor=compressor, overwrite=overwrite, headers=headers ) # store first chunk _zarr_store_chunk(root, keys, chunk) # store remaining chunks for chunk, _, _, _ in it: _zarr_store_chunk(root, keys, chunk)
vcf_to_zarr.__doc__ = vcf_to_zarr.__doc__.format( input=_doc_param_input, output=_doc_param_output, overwrite=_doc_param_overwrite, fields=_doc_param_fields, types=_doc_param_types, numbers=_doc_param_numbers, alt_number=_doc_param_alt_number, fills=_doc_param_fills, region=_doc_param_region, tabix=_doc_param_tabix, samples=_doc_param_samples, transformers=_doc_param_transformers, buffer_size=_doc_param_buffer_size, chunk_length=_doc_param_chunk_length, chunk_width=_doc_param_chunk_width, log=_doc_param_log, )
[docs]def iter_vcf_chunks(input, fields=None, types=None, numbers=None, alt_number=DEFAULT_ALT_NUMBER, fills=None, region=None, tabix='tabix', samples=None, transformers=None, buffer_size=DEFAULT_BUFFER_SIZE, chunk_length=DEFAULT_CHUNK_LENGTH): """Iterate over chunks of data from a VCF file as NumPy arrays. Parameters ---------- input : string {input} fields : list of strings, optional {fields} types : dict, optional {types} numbers : dict, optional {numbers} alt_number : int, optional {alt_number} fills : dict, optional {fills} region : string, optional {region} tabix : string, optional {tabix} samples : list of strings {samples} transformers : list of transformer objects, optional {transformers} buffer_size : int, optional {buffer_size} chunk_length : int, optional {chunk_length} Returns ------- fields : list of strings Normalised names of fields that will be extracted. samples : ndarray Samples for which data will be extracted. headers : VCFHeaders Tuple of metadata extracted from VCF headers. it : iterator Chunk iterator. """ # setup commmon keyword args kwds = dict(fields=fields, types=types, numbers=numbers, alt_number=alt_number, chunk_length=chunk_length, fills=fills, samples=samples) # obtain a file-like object close = False if isinstance(input, str) and input.endswith('gz'): if region and tabix and os.name != 'nt': try: # try tabix p = subprocess.Popen([tabix, '-h', input, region], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0) # check if tabix exited early, look for tabix error time.sleep(.5) poll = p.poll() if poll is not None and poll > 0: err = p.stdout.read() if not PY2: err = str(err, 'ascii') p.stdout.close() raise RuntimeError(err.strip()) fileobj = p.stdout close = True # N.B., still pass the region parameter through so we get strictly only # variants that start within the requested region. See also # https://github.com/alimanfoo/vcfnp/issues/54 except FileNotFoundError: # no tabix, fall back to scanning warnings.warn('tabix not found, falling back to scanning to region') fileobj = gzip.open(input, mode='rb') close = True except Exception as e: warnings.warn('error occurred attempting tabix (%s); falling back to ' 'scanning to region' % e) fileobj = gzip.open(input, mode='rb') close = True else: fileobj = gzip.open(input, mode='rb') close = True elif isinstance(input, str): # assume no compression fileobj = open(input, mode='rb', buffering=0) close = True elif hasattr(input, 'readinto'): fileobj = input else: raise ValueError('path must be string or file-like, found %r' % input) # setup input stream stream = FileInputStream(fileobj, buffer_size=buffer_size, close=close) # deal with region kwds['region'] = region # setup iterator fields, samples, headers, it = _iter_vcf_stream(stream, **kwds) # setup transformers if transformers is not None: # API flexibility if not isinstance(transformers, (list, tuple)): transformers = [transformers] for trans in transformers: fields = trans.transform_fields(fields) it = _chunk_iter_transform(it, transformers) return fields, samples, headers, it
iter_vcf_chunks.__doc__ = iter_vcf_chunks.__doc__.format( input=_doc_param_input, fields=_doc_param_fields, types=_doc_param_types, numbers=_doc_param_numbers, alt_number=_doc_param_alt_number, fills=_doc_param_fills, region=_doc_param_region, tabix=_doc_param_tabix, samples=_doc_param_samples, transformers=_doc_param_transformers, buffer_size=_doc_param_buffer_size, chunk_length=_doc_param_chunk_length, log=_doc_param_log, ) FIXED_VARIANTS_FIELDS = ( 'CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', ) def _normalize_field_prefix(field, headers): # already contains prefix? if field.startswith('variants/') or field.startswith('calldata/'): return field # try to find in fixed fields elif field in FIXED_VARIANTS_FIELDS: return 'variants/' + field # try to find in FILTER elif field.startswith('FILTER_'): return 'variants/' + field # try to find in FILTER elif field in headers.filters: return 'variants/FILTER_' + field # try to find in INFO elif field in headers.infos: return 'variants/' + field # try to find in FORMAT elif field in headers.formats: return 'calldata/' + field else: # assume anything else in variants, even if header not found return 'variants/' + field def _check_field(field, headers): # assume field is already normalized for prefix group, name = field.split('/') if group == 'variants': if name in FIXED_VARIANTS_FIELDS: pass elif name in ['numalt', 'svlen', 'is_snp']: # computed fields pass elif name.startswith('FILTER_'): filter_name = name[7:] if filter_name in headers.filters: pass else: warnings.warn('%r FILTER header not found' % filter_name) elif name in headers.infos: pass else: warnings.warn('%r INFO header not found' % name) elif group == 'calldata': if name in headers.formats: pass else: warnings.warn('%r FORMAT header not found' % name) else: # should never be reached raise ValueError('invalid field specification: %r' % field) def _add_all_fields(fields, headers, samples): _add_all_variants_fields(fields, headers) if len(samples) > 0: _add_all_calldata_fields(fields, headers) def _add_all_variants_fields(fields, headers): _add_all_fixed_variants_fields(fields) _add_all_info_fields(fields, headers) _add_all_filter_fields(fields, headers) # add in computed fields for f in 'variants/numalt', 'variants/svlen', 'variants/is_snp': if f not in fields: fields.append(f) def _add_all_fixed_variants_fields(fields): for k in FIXED_VARIANTS_FIELDS: f = 'variants/' + k if f not in fields: fields.append(f) def _add_all_info_fields(fields, headers): for k in headers.infos: f = 'variants/' + k if f not in fields: fields.append(f) def _add_all_filter_fields(fields, headers): fields.append('variants/FILTER_PASS') for k in headers.filters: f = 'variants/FILTER_' + k if f not in fields: fields.append(f) def _add_all_calldata_fields(fields, headers): # only add calldata fields if there are samples if headers.samples: for k in headers.formats: f = 'calldata/' + k if f not in fields: fields.append(f) def _normalize_fields(fields, headers, samples): # setup normalized fields normed_fields = list() # special case, single field specification if isinstance(fields, str): fields = [fields] for f in fields: # special cases: be lenient about how to specify if f in ['*', 'kitchen sink']: _add_all_fields(normed_fields, headers, samples) elif f in ['variants', 'variants*', 'variants/*']: _add_all_variants_fields(normed_fields, headers) elif f in ['calldata', 'calldata*', 'calldata/*'] and len(samples) > 0: _add_all_calldata_fields(normed_fields, headers) elif f in ['INFO', 'INFO*', 'INFO/*', 'variants/INFO', 'variants/INFO*', 'variants/INFO/*']: _add_all_info_fields(normed_fields, headers) elif f in ['FILTER', 'FILTER*', 'FILTER/*', 'FILTER_*', 'variants/FILTER', 'variants/FILTER*', 'variants/FILTER/*', 'variants/FILTER_*']: _add_all_filter_fields(normed_fields, headers) # exact field specification else: # normalize field specification f = _normalize_field_prefix(f, headers) _check_field(f, headers) if f.startswith('calldata/') and len(samples) == 0: # only add calldata fields if there are samples pass elif f not in normed_fields: normed_fields.append(f) return normed_fields default_integer_dtype = 'i4' default_float_dtype = 'f4' default_string_dtype = 'object' def _normalize_type(t): if t == 'Integer': return np.dtype(default_integer_dtype) elif t == 'Float': return np.dtype(default_float_dtype) elif t == 'String': return np.dtype(default_string_dtype) elif t == 'Flag': return np.dtype(bool) elif isinstance(t, str) and t.startswith('genotype/'): # custom genotype dtype return t elif isinstance(t, str) and t.startswith('genotype_ac/'): # custom genotype allele counts dtype return t else: return np.dtype(t) default_types = { 'variants/CHROM': 'object', 'variants/POS': 'i4', 'variants/ID': 'object', 'variants/REF': 'object', 'variants/ALT': 'object', 'variants/QUAL': 'f4', 'variants/DP': 'i4', 'variants/AN': 'i4', 'variants/AC': 'i4', 'variants/AF': 'f4', 'variants/MQ': 'f4', 'variants/ANN': 'object', 'calldata/GT': 'genotype/i1', 'calldata/GQ': 'i1', 'calldata/HQ': 'i1', 'calldata/DP': 'i2', 'calldata/AD': 'i2', 'calldata/MQ0': 'i2', 'calldata/MQ': 'f2', } def _normalize_types(types, fields, headers): # normalize user-provided types if types is None: types = dict() types = {_normalize_field_prefix(f, headers): _normalize_type(t) for f, t in types.items()} # setup output normed_types = dict() for f in fields: group, name = f.split('/') if f in types: normed_types[f] = types[f] elif f in default_types: normed_types[f] = _normalize_type(default_types[f]) elif group == 'variants': if name in ['numalt', 'svlen', 'is_snp']: # computed fields, special case continue elif name.startswith('FILTER_'): normed_types[f] = np.dtype(bool) elif name in headers.infos: normed_types[f] = _normalize_type(headers.infos[name]['Type']) else: # fall back to string normed_types[f] = _normalize_type('String') warnings.warn('no type for field %r, assuming %s' % (f, normed_types[f])) elif group == 'calldata': if name in headers.formats: normed_types[f] = _normalize_type(headers.formats[name]['Type']) else: # fall back to string normed_types[f] = _normalize_type('String') warnings.warn('no type for field %r, assuming %s' % (f, normed_types[f])) else: raise RuntimeError('unpected field: %r' % f) return normed_types default_numbers = { 'variants/CHROM': 1, 'variants/POS': 1, 'variants/ID': 1, 'variants/REF': 1, 'variants/ALT': 'A', 'variants/QUAL': 1, 'variants/DP': 1, 'variants/AN': 1, 'variants/AC': 'A', 'variants/AF': 'A', 'variants/MQ': 1, 'variants/ANN': 1, 'calldata/DP': 1, 'calldata/GT': 2, 'calldata/GQ': 1, 'calldata/HQ': 2, 'calldata/AD': 'R', 'calldata/MQ0': 1, 'calldata/MQ': 1, } def _normalize_number(field, n, alt_number): if n == '.': return 1 elif n == 'A': return alt_number elif n == 'R': return alt_number + 1 elif n == 'G': return 3 else: try: return int(n) except ValueError: warnings.warn('error parsing %r as number for field %r' % (n, field)) return 1 def _normalize_numbers(numbers, fields, headers, alt_number): # normalize field prefixes if numbers is None: numbers = dict() numbers = {_normalize_field_prefix(f, headers): n for f, n in numbers.items()} # setup output normed_numbers = dict() for f in fields: group, name = f.split('/') if f in numbers: normed_numbers[f] = _normalize_number(f, numbers[f], alt_number) elif f in default_numbers: normed_numbers[f] = _normalize_number(f, default_numbers[f], alt_number) elif group == 'variants': if name in ['numalt', 'svlen', 'is_snp']: # computed fields, special case (for svlen, number depends on ALT) continue elif name.startswith('FILTER_'): normed_numbers[f] = 0 elif name in headers.infos: normed_numbers[f] = _normalize_number(f, headers.infos[name]['Number'], alt_number) else: # fall back to 1 normed_numbers[f] = 1 warnings.warn('no number for field %r, assuming 1' % f) elif group == 'calldata': if name in headers.formats: normed_numbers[f] = _normalize_number(f, headers.formats[name]['Number'], alt_number) else: # fall back to 1 normed_numbers[f] = 1 warnings.warn('no number for field %r, assuming 1' % f) else: raise RuntimeError('unexpected field: %r' % f) return normed_numbers def _normalize_fills(fills, fields, headers): if fills is None: fills = dict() fills = {_normalize_field_prefix(f, headers): v for f, v in fills.items()} # setup output normed_fills = dict() for f in fields: if f in fills: normed_fills[f] = fills[f] return normed_fills def _normalize_samples(samples, headers, types): loc_samples = np.zeros(len(headers.samples), dtype='u1') if samples is None: normed_samples = list(headers.samples) loc_samples.fill(1) else: samples = set(samples) normed_samples = [] for i, s in enumerate(headers.samples): if i in samples: normed_samples.append(s) samples.remove(i) loc_samples[i] = 1 elif s in samples: normed_samples.append(s) samples.remove(s) loc_samples[i] = 1 if len(samples) > 0: warnings.warn('some samples not found, will be ignored: ' + ', '.join(map(repr, sorted(samples)))) t = default_string_dtype if types is not None: t = types.get('samples', t) normed_samples = np.array(normed_samples, dtype=t) return normed_samples, loc_samples def _iter_vcf_stream(stream, fields, types, numbers, alt_number, chunk_length, fills, region, samples): # read VCF headers headers = _read_vcf_headers(stream) # setup samples samples, loc_samples = _normalize_samples(samples=samples, headers=headers, types=types) # setup fields to read if fields is None: # choose default fields fields = list() _add_all_fixed_variants_fields(fields) fields.append('variants/FILTER_PASS') if len(samples) > 0 and 'GT' in headers.formats: fields.append('calldata/GT') else: fields = _normalize_fields(fields=fields, headers=headers, samples=samples) # setup data types types = _normalize_types(types=types, fields=fields, headers=headers) # setup numbers (a.k.a., arity) numbers = _normalize_numbers(numbers=numbers, fields=fields, headers=headers, alt_number=alt_number) # setup fills fills = _normalize_fills(fills=fills, fields=fields, headers=headers) # setup chunks iterator chunks = VCFChunkIterator( stream, chunk_length=chunk_length, headers=headers, fields=fields, types=types, numbers=numbers, fills=fills, region=region, loc_samples=loc_samples ) return fields, samples, headers, chunks # pre-compile some regular expressions _re_filter_header = \ re.compile('##FILTER=<ID=([^,]+),Description="([^"]+)">') _re_info_header = \ re.compile('##INFO=<ID=([^,]+),Number=([^,]+),Type=([^,]+),Description="([^"]+)">') _re_format_header = \ re.compile('##FORMAT=<ID=([^,]+),Number=([^,]+),Type=([^,]+),Description="([^"]+)">') VCFHeaders = namedtuple('VCFHeaders', ['headers', 'filters', 'infos', 'formats', 'samples']) def _read_vcf_headers(stream): # setup headers = [] samples = None filters = dict() infos = dict() formats = dict() # read first header line header = stream.readline() if not PY2: header = str(header, 'ascii') while header and header[0] == '#': headers.append(header) if header.startswith('##FILTER'): match = _re_filter_header.match(header) if match is None: warnings.warn('invalid FILTER header: %r' % header) else: k, d = match.groups() if k in filters: warnings.warn('multiple FILTER headers for %r' % k) filters[k] = {'ID': k, 'Description': d} elif header.startswith('##INFO'): match = _re_info_header.match(header) if match is None: warnings.warn('invalid INFO header: %r' % header) else: k, n, t, d = match.groups() if k in infos: warnings.warn('multiple INFO headers for %r' % k) infos[k] = {'ID': k, 'Number': n, 'Type': t, 'Description': d} elif header.startswith('##FORMAT'): match = _re_format_header.match(header) if match is None: warnings.warn('invalid FORMAT header: %r' % header) else: k, n, t, d = match.groups() if k in formats: warnings.warn('multiple FORMAT headers for %r' % k) formats[k] = {'ID': k, 'Number': n, 'Type': t, 'Description': d} elif header.startswith('#CHROM'): # parse out samples samples = header.strip().split('\t')[9:] break # read next header line header = stream.readline() if not PY2: header = str(header, 'ascii') # check if we saw the mandatory header line or not if samples is None: # can't warn about this, it's fatal raise RuntimeError('VCF file is missing mandatory header line ("#CHROM...")') return VCFHeaders(headers, filters, infos, formats, samples) def _chunk_to_dataframe(fields, chunk): import pandas items = list() for f in fields: a = chunk[f] group, name = f.split('/') assert group == 'variants' if a.dtype.kind == 'S': # always convert strings for pandas - if U then pandas will use object dtype a = a.astype('U') if a.ndim == 1: items.append((name, a)) elif a.ndim == 2: for i in range(a.shape[1]): items.append(('%s_%s' % (name, i+1), a[:, i])) else: warnings.warn('cannot handle array %r with >2 dimensions, skipping' % name) df = pandas.DataFrame.from_items(items) return df
[docs]def vcf_to_dataframe(input, fields=None, types=None, numbers=None, alt_number=DEFAULT_ALT_NUMBER, fills=None, region=None, tabix='tabix', transformers=None, buffer_size=DEFAULT_BUFFER_SIZE, chunk_length=DEFAULT_CHUNK_LENGTH, log=None): """Read data from a VCF file into a pandas DataFrame. Parameters ---------- input : string {input} fields : list of strings, optional {fields} types : dict, optional {types} numbers : dict, optional {numbers} alt_number : int, optional {alt_number} fills : dict, optional {fills} region : string, optional {region} tabix : string, optional {tabix} transformers : list of transformer objects, optional {transformers} buffer_size : int, optional {buffer_size} chunk_length : int, optional {chunk_length} log : file-like, optional {log} Returns ------- df : pandas.DataFrame """ import pandas # samples requested? # noinspection PyTypeChecker _, fields = _prep_fields_param(fields) # setup fields, _, _, it = iter_vcf_chunks( input=input, fields=fields, types=types, numbers=numbers, alt_number=alt_number, buffer_size=buffer_size, chunk_length=chunk_length, fills=fills, region=region, tabix=tabix, samples=[], transformers=transformers ) # setup progress logging if log is not None: it = _chunk_iter_progress(it, log, prefix='[vcf_to_dataframe]') # read all chunks into a list chunks = [chunk for chunk, _, _, _ in it] # setup output output = None if chunks: # concatenate chunks output = pandas.concat([_chunk_to_dataframe(fields, chunk) for chunk in chunks]) return output
vcf_to_dataframe.__doc__ = vcf_to_dataframe.__doc__.format( input=_doc_param_input, fields=_doc_param_fields, types=_doc_param_types, numbers=_doc_param_numbers, alt_number=_doc_param_alt_number, fills=_doc_param_fills, region=_doc_param_region, tabix=_doc_param_tabix, transformers=_doc_param_transformers, buffer_size=_doc_param_buffer_size, chunk_length=_doc_param_chunk_length, log=_doc_param_log, )
[docs]def vcf_to_csv(input, output, fields=None, types=None, numbers=None, alt_number=DEFAULT_ALT_NUMBER, fills=None, region=None, tabix='tabix', transformers=None, buffer_size=DEFAULT_BUFFER_SIZE, chunk_length=DEFAULT_CHUNK_LENGTH, log=None, **kwargs): """Read data from a VCF file and write out to a comma-separated values (CSV) file. Parameters ---------- input : string {input} output : string {output} fields : list of strings, optional {fields} types : dict, optional {types} numbers : dict, optional {numbers} alt_number : int, optional {alt_number} fills : dict, optional {fills} region : string, optional {region} tabix : string, optional {tabix} transformers : list of transformer objects, optional {transformers} buffer_size : int, optional {buffer_size} chunk_length : int, optional {chunk_length} log : file-like, optional {log} kwargs : keyword arguments All remaining keyword arguments are passed through to pandas.DataFrame.to_csv(). E.g., to write a tab-delimited file, provide `sep='\t'`. """ # samples requested? # noinspection PyTypeChecker _, fields = _prep_fields_param(fields) # setup fields, _, _, it = iter_vcf_chunks( input=input, fields=fields, types=types, numbers=numbers, alt_number=alt_number, buffer_size=buffer_size, chunk_length=chunk_length, fills=fills, region=region, tabix=tabix, samples=[], transformers=transformers ) # setup progress logging if log is not None: it = _chunk_iter_progress(it, log, prefix='[vcf_to_csv]') kwargs['index'] = False for i, (chunk, _, _, _) in enumerate(it): df = _chunk_to_dataframe(fields, chunk) if i == 0: kwargs['header'] = True kwargs['mode'] = 'w' else: kwargs['header'] = False kwargs['mode'] = 'a' df.to_csv(output, **kwargs)
vcf_to_csv.__doc__ = vcf_to_csv.__doc__.format( input=_doc_param_input, output=_doc_param_output, fields=_doc_param_fields, types=_doc_param_types, numbers=_doc_param_numbers, alt_number=_doc_param_alt_number, fills=_doc_param_fills, region=_doc_param_region, tabix=_doc_param_tabix, transformers=_doc_param_transformers, buffer_size=_doc_param_buffer_size, chunk_length=_doc_param_chunk_length, log=_doc_param_log, ) def _chunk_to_recarray(fields, chunk): arrays = list() names = list() for f in fields: a = chunk[f] group, name = f.split('/') if a.ndim == 1: arrays.append(a) names.append(name) elif a.ndim == 2: for i in range(a.shape[1]): arrays.append(a[:, i]) names.append('%s_%s' % (name, i+1)) else: warnings.warn('cannot handle arrays with >2 dimensions, ignoring %r' % name) ra = np.rec.fromarrays(arrays, names=names) return ra
[docs]def vcf_to_recarray(input, fields=None, types=None, numbers=None, alt_number=DEFAULT_ALT_NUMBER, fills=None, region=None, tabix='tabix', transformers=None, buffer_size=DEFAULT_BUFFER_SIZE, chunk_length=DEFAULT_CHUNK_LENGTH, log=None): """Read data from a VCF file into a NumPy recarray. Parameters ---------- input : string {input} fields : list of strings, optional {fields} types : dict, optional {types} numbers : dict, optional {numbers} alt_number : int, optional {alt_number} fills : dict, optional {fills} region : string, optional {region} tabix : string, optional {tabix} transformers : list of transformer objects, optional {transformers} buffer_size : int, optional {buffer_size} chunk_length : int, optional {chunk_length} log : file-like, optional {log} Returns ------- ra : np.rec.array """ # samples requested? # noinspection PyTypeChecker _, fields = _prep_fields_param(fields) # setup chunk iterator # N.B., set samples to empty list so we don't get any calldata fields fields, _, _, it = iter_vcf_chunks( input=input, fields=fields, types=types, numbers=numbers, alt_number=alt_number, buffer_size=buffer_size, chunk_length=chunk_length, fills=fills, region=region, tabix=tabix, samples=[], transformers=transformers ) # setup progress logging if log is not None: it = _chunk_iter_progress(it, log, prefix='[vcf_to_recarray]') # read all chunks into a list chunks = [chunk for chunk, _, _, _ in it] # setup output output = None if chunks: # concatenate chunks output = np.concatenate([_chunk_to_recarray(fields, chunk) for chunk in chunks]) return output
vcf_to_recarray.__doc__ = vcf_to_recarray.__doc__.format( input=_doc_param_input, fields=_doc_param_fields, types=_doc_param_types, numbers=_doc_param_numbers, alt_number=_doc_param_alt_number, fills=_doc_param_fills, region=_doc_param_region, tabix=_doc_param_tabix, transformers=_doc_param_transformers, buffer_size=_doc_param_buffer_size, chunk_length=_doc_param_chunk_length, log=_doc_param_log, )