Source code for allel.util

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
from contextlib import contextmanager
from functools import update_wrapper
import atexit
import os


import numpy as np


from allel.compat import string_types


@contextmanager
def ignore_invalid():
    err = np.seterr(invalid='ignore')
    try:
        yield
    finally:
        np.seterr(**err)


def check_array_like(a, *ndims, **kwargs):
    if not hasattr(a, 'ndim'):
        cls = kwargs.pop('default', np.asarray)
        a = cls(a, **kwargs)
    if a.ndim not in ndims:
        raise ValueError('invalid number of dimensions: %s' % a.ndim)


def asarray_ndim(a, *ndims, **kwargs):
    """Ensure numpy array.

    Parameters
    ----------
    a : array_like
    *ndims : int, optional
        Allowed values for number of dimensions.
    **kwargs
        Passed through to :func:`numpy.array`.

    Returns
    -------
    a : numpy.ndarray

    """
    allow_none = kwargs.pop('allow_none', False)
    kwargs.setdefault('copy', False)
    if a is None and allow_none:
        return None
    a = np.array(a, **kwargs)
    if a.ndim not in ndims:
        if len(ndims) > 1:
            expect_str = 'one of %s' % str(ndims)
        else:
            # noinspection PyUnresolvedReferences
            expect_str = '%s' % ndims[0]
        raise TypeError('bad number of dimensions: expected %s; found %s' %
                        (expect_str, a.ndim))
    return a


def check_ndim(a, ndim):
    if a.ndim != ndim:
        raise TypeError('bad number of dimensions: expected %s; found %s' % (ndim, a.ndim))


def check_shape(a, shape):
    if a.shape != shape:
        raise TypeError('bad shape: expected %s; found %s' % (shape, a.shape))


def check_dtype(a, *dtypes):
    dtypes = [np.dtype(t) for t in dtypes]
    if a.dtype not in dtypes:
        raise TypeError('bad dtype: expected on of %s; found %s' % (dtypes, a.dtype))


def check_dtype_kind(a, *kinds):
    if a.dtype.kind not in kinds:
        raise TypeError('bad dtype kind: expected on of %s; found %s' % (kinds, a.dtype.kind))


def check_integer_dtype(a):
    check_dtype_kind(a, 'u', 'i')


def check_dim0_aligned(*arrays):
    check_dim_aligned(0, *arrays)


def check_dim1_aligned(*arrays):
    check_dim_aligned(1, *arrays)


def check_dim_aligned(dim, *arrays):
    a = arrays[0]
    for b in arrays[1:]:
        if b.shape[dim] != a.shape[dim]:
            raise ValueError(
                'arrays do not have matching length for dimension %s' % dim
            )


def check_same_ndim(*arrays):
    a = arrays[0]
    for b in arrays[1:]:
        if len(b.shape) != len(a.shape):
            raise ValueError(
                'arrays do not have same number of dimensions'
            )


def check_equal_length(a, *others):
    expected_length = len(a)
    for b in others:
        if len(b) != expected_length:
            raise ValueError('sequences do not have matching length')


def resize_dim1(a, l, fill=0):
    if a.shape[1] < l:
        newshape = a.shape[0], l
        b = np.zeros(newshape, dtype=a.dtype)
        if fill != 0:
            b.fill(fill)
        b[:, :a.shape[1]] = a
        return b
    else:
        return a


def ensure_dim1_aligned(*arrays, **kwargs):
    fill = kwargs.get('fill', 0)
    dim1_length = max(a.shape[1] for a in arrays)
    arrays = [resize_dim1(a, dim1_length, fill=fill) for a in arrays]
    return arrays


def ensure_square(dist):
    from scipy.spatial.distance import squareform
    dist = asarray_ndim(dist, 1, 2)
    if dist.ndim == 1:
        dist = squareform(dist)
    else:
        if dist.shape[0] != dist.shape[1]:
            raise ValueError('distance matrix is not square')
    return dist


class _HashedSeq(list):

    __slots__ = 'hashvalue'

    # noinspection PyShadowingBuiltins,PyMissingConstructor
    def __init__(self, tup, hash=hash):
        self[:] = tup
        self.hashvalue = hash(tup)

    def __hash__(self):
        return self.hashvalue


# noinspection PyShadowingBuiltins
def _make_key(args, kwds, typed,
              kwd_mark=('__kwargs__',),
              fasttypes={int, str, frozenset, type(None)},
              sorted=sorted, tuple=tuple, type=type, len=len):
    key = args
    kwd_items = sorted(kwds.items())
    if kwds:
        key += kwd_mark
        for item in kwd_items:
            key += item
    if typed:
        key += tuple(type(v) for v in args)
        if kwds:
            key += tuple(type(v) for _, v in kwd_items)
    else:
        key = args
    if len(key) == 1 and type(key[0]) in fasttypes:
        return key[0]
    return _HashedSeq(key)


def _hdf5_cache_act(filepath, parent, container, key, names, no_cache,
                    user_function, args, kwargs, h5dcreate_kwargs):
    import h5py

    # open the HDF5 file
    with h5py.File(filepath, mode='a') as h5f:

        # find parent group
        if parent is None:
            # use root group
            h5g_parent = h5f
        else:
            h5g_parent = h5f.require_group(parent)

        # find cache container group
        h5g_container = h5g_parent.require_group(container)

        # find cache group
        h5g = h5g_container.require_group(key)

        # call user function and (re)build cache
        if no_cache or '__success__' not in h5g.attrs:

            # reset success mark if present
            if '__success__' in h5g.attrs:
                del h5g.attrs['__success__']

            # compute result
            result = user_function(*args, **kwargs)

            # handle tuple of return values
            if isinstance(result, tuple):

                # determine dataset names
                if names is None:
                    names = ['f%02d' % i for i in range(len(result))]
                elif len(names) < len(result):
                    names = list(names) + ['f%02d' % i
                                           for i in range(len(names),
                                                          len(result))]

                # save data
                for n, r in zip(names, result):
                    if n in h5g:
                        del h5g[n]
                    if np.isscalar(r):
                        h5g.create_dataset(n, data=r)
                    else:
                        h5g.create_dataset(n, data=r, **h5dcreate_kwargs)

            # handle single return value
            else:

                # determine dataset name
                if names is None:
                    n = 'data'
                elif isinstance(names, string_types):
                    n = names
                elif len(names) > 0:
                    n = names[0]
                else:
                    n = 'data'

                # save data
                if n in h5g:
                    del h5g[n]
                if np.isscalar(result):
                    h5g.create_dataset(n, data=result)
                else:
                    h5g.create_dataset(n, data=result,
                                       **h5dcreate_kwargs)

            # mark success
            h5g.attrs['__success__'] = True

        # load from cache
        else:

            # determine dataset names
            if names is None:
                names = sorted(h5g.keys())
            elif isinstance(names, string_types):
                names = (names,)

            # load result from cache
            if len(names) == 1:
                result = h5g[names[0]]
                result = result[:] if len(result.shape) > 0 else result[()]
            else:
                result = tuple(h5g[n] for n in names)
                result = tuple(r[:] if len(r.shape) > 0 else r[()]
                               for r in result)

        return result


[docs]def hdf5_cache(filepath=None, parent=None, group=None, names=None, typed=False, hashed_key=False, **h5dcreate_kwargs): """HDF5 cache decorator. Parameters ---------- filepath : string, optional Path to HDF5 file. If None a temporary file name will be used. parent : string, optional Path to group within HDF5 file to use as parent. If None the root group will be used. group : string, optional Path to group within HDF5 file, relative to parent, to use as container for cached data. If None the name of the wrapped function will be used. names : sequence of strings, optional Name(s) of dataset(s). If None, default names will be 'f00', 'f01', etc. typed : bool, optional If True, arguments of different types will be cached separately. For example, f(3.0) and f(3) will be treated as distinct calls with distinct results. hashed_key : bool, optional If False (default) the key will not be hashed, which makes for readable cache group names. If True the key will be hashed, however note that on Python >= 3.3 the hash value will not be the same between sessions unless the environment variable PYTHONHASHSEED has been set to the same value. Returns ------- decorator : function Examples -------- Without any arguments, will cache using a temporary HDF5 file:: >>> import allel >>> @allel.util.hdf5_cache() ... def foo(n): ... print('executing foo') ... return np.arange(n) ... >>> foo(3) executing foo array([0, 1, 2]) >>> foo(3) array([0, 1, 2]) >>> foo.cache_filepath # doctest: +SKIP '/tmp/tmp_jwtwgjz' Supports multiple return values, including scalars, e.g.:: >>> @allel.util.hdf5_cache() ... def bar(n): ... print('executing bar') ... a = np.arange(n) ... return a, a**2, n**2 ... >>> bar(3) executing bar (array([0, 1, 2]), array([0, 1, 4]), 9) >>> bar(3) (array([0, 1, 2]), array([0, 1, 4]), 9) Names can also be specified for the datasets, e.g.:: >>> @allel.util.hdf5_cache(names=['z', 'x', 'y']) ... def baz(n): ... print('executing baz') ... a = np.arange(n) ... return a, a**2, n**2 ... >>> baz(3) executing baz (array([0, 1, 2]), array([0, 1, 4]), 9) >>> baz(3) (array([0, 1, 2]), array([0, 1, 4]), 9) """ # initialise HDF5 file path if filepath is None: import tempfile filepath = tempfile.mktemp(prefix='scikit_allel_', suffix='.h5') atexit.register(os.remove, filepath) # initialise defaults for dataset creation h5dcreate_kwargs.setdefault('chunks', True) def decorator(user_function): # setup the name for the cache container group if group is None: container = user_function.__name__ else: container = group def wrapper(*args, **kwargs): # load from cache or not no_cache = kwargs.pop('no_cache', False) # compute a key from the function arguments key = _make_key(args, kwargs, typed) if hashed_key: key = str(hash(key)) else: key = str(key).replace('/', '__slash__') return _hdf5_cache_act(filepath, parent, container, key, names, no_cache, user_function, args, kwargs, h5dcreate_kwargs) wrapper.cache_filepath = filepath return update_wrapper(wrapper, user_function) return decorator
def contains_newaxis(item): if item is None: return True elif item is np.newaxis: return True elif isinstance(item, tuple): return any((i is None or i is np.newaxis) for i in item) return False def check_ploidy(actual, expect): if expect != actual: raise ValueError( 'expected ploidy %s, found %s' % (expect, actual) ) def check_min_samples(actual, expect): if actual < expect: raise ValueError( 'expected at least %s samples, found %s' % (expect, actual) ) def check_type(obj, expected): if not isinstance(obj, expected): raise TypeError('bad argument type, expected %s, found %s' % (expected, type(obj)))