Source code for allel.util

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
from contextlib import contextmanager
from functools import update_wrapper
import atexit
import os


import numpy as np


from allel.compat import string_types


@contextmanager
def ignore_invalid():
    err = np.seterr(invalid='ignore')
    try:
        yield
    finally:
        np.seterr(**err)


def check_array_like(a, *ndims, **kwargs):
    if not hasattr(a, 'ndim'):
        cls = kwargs.pop('default', np.asarray)
        a = cls(a, **kwargs)
    if a.ndim not in ndims:
        raise ValueError('invalid number of dimensions: %s' % a.ndim)


def asarray_ndim(a, *ndims, **kwargs):
    """Ensure numpy array.

    Parameters
    ----------
    a : array_like
    *ndims : int, optional
        Allowed values for number of dimensions.
    **kwargs
        Passed through to :func:`numpy.array`.

    Returns
    -------
    a : numpy.ndarray

    """
    allow_none = kwargs.pop('allow_none', False)
    kwargs.setdefault('copy', False)
    if a is None and allow_none:
        return None
    a = np.array(a, **kwargs)
    if a.ndim not in ndims:
        raise ValueError('invalid number of dimensions: %s' % a.ndim)
    return a


def check_dim0_aligned(*arrays):
    check_dim_aligned(0, *arrays)


def check_dim_aligned(dim, *arrays):
    a = arrays[0]
    for b in arrays[1:]:
        if b.shape[dim] != a.shape[dim]:
            raise ValueError(
                'arrays do not have matching length for dimension %s' % dim
            )


def check_same_ndim(*arrays):
    a = arrays[0]
    for b in arrays[1:]:
        if len(b.shape) != len(a.shape):
            raise ValueError(
                'arrays do not have same number of dimensions'
            )


def check_equal_length(a, *others):
    l = len(a)
    for b in others:
        if len(b) != l:
            raise ValueError('sequences do not have matching length')


def resize_dim1(a, l, fill=0):
    if a.shape[1] < l:
        newshape = a.shape[0], l
        b = np.zeros(newshape, dtype=a.dtype)
        if fill != 0:
            b.fill(fill)
        b[:, :a.shape[1]] = a
        return b
    else:
        return a


def ensure_dim1_aligned(*arrays, **kwargs):
    fill = kwargs.get('fill', 0)
    l = max(a.shape[1] for a in arrays)
    arrays = [resize_dim1(a, l, fill=fill) for a in arrays]
    return arrays


def ensure_square(dist):
    from scipy.spatial.distance import squareform
    dist = asarray_ndim(dist, 1, 2)
    if dist.ndim == 1:
        dist = squareform(dist)
    else:
        if dist.shape[0] != dist.shape[1]:
            raise ValueError('distance matrix is not square')
    return dist


class _HashedSeq(list):

    __slots__ = 'hashvalue'

    # noinspection PyShadowingBuiltins,PyMissingConstructor
    def __init__(self, tup, hash=hash):
        self[:] = tup
        self.hashvalue = hash(tup)

    def __hash__(self):
        return self.hashvalue


# noinspection PyShadowingBuiltins
def _make_key(args, kwds, typed,
              kwd_mark=('__kwargs__',),
              fasttypes={int, str, frozenset, type(None)},
              sorted=sorted, tuple=tuple, type=type, len=len):
    key = args
    if kwds:
        sorted_items = sorted(kwds.items())
        key += kwd_mark
        for item in sorted_items:
            key += item
    if typed:
        key += tuple(type(v) for v in args)
        if kwds:
            key += tuple(type(v) for k, v in sorted_items)
    elif len(key) == 1 and type(key[0]) in fasttypes:
        return key[0]
    return _HashedSeq(key)


def _hdf5_cache_act(filepath, parent, container, key, names, no_cache,
                    user_function, args, kwargs, h5dcreate_kwargs):
    import h5py

    # open the HDF5 file
    with h5py.File(filepath, mode='a') as h5f:

        # find parent group
        if parent is None:
            # use root group
            h5g_parent = h5f
        else:
            h5g_parent = h5f.require_group(parent)

        # find cache container group
        h5g_container = h5g_parent.require_group(container)

        # find cache group
        h5g = h5g_container.require_group(key)

        # call user function and (re)build cache
        if no_cache or '__success__' not in h5g.attrs:

            # reset success mark if present
            if '__success__' in h5g.attrs:
                del h5g.attrs['__success__']

            # compute result
            result = user_function(*args, **kwargs)

            # handle tuple of return values
            if isinstance(result, tuple):

                # determine dataset names
                if names is None:
                    names = ['f%02d' % i for i in range(len(result))]
                elif len(names) < len(result):
                    names = list(names) + ['f%02d' % i
                                           for i in range(len(names),
                                                          len(result))]

                # save data
                for n, r in zip(names, result):
                    if n in h5g:
                        del h5g[n]
                    if np.isscalar(r):
                        h5g.create_dataset(n, data=r)
                    else:
                        h5g.create_dataset(n, data=r, **h5dcreate_kwargs)

            # handle single return value
            else:

                # determine dataset name
                if names is None:
                    n = 'data'
                elif isinstance(names, string_types):
                    n = names
                elif len(names) > 0:
                    n = names[0]
                else:
                    n = 'data'

                # save data
                if n in h5g:
                    del h5g[n]
                if np.isscalar(result):
                    h5g.create_dataset(n, data=result)
                else:
                    h5g.create_dataset(n, data=result,
                                       **h5dcreate_kwargs)

            # mark success
            h5g.attrs['__success__'] = True

        # load from cache
        else:

            # determine dataset names
            if names is None:
                names = sorted(h5g.keys())
            elif isinstance(names, string_types):
                names = (names,)

            # load result from cache
            if len(names) == 1:
                result = h5g[names[0]]
                result = result[:] if len(result.shape) > 0 else result[()]
            else:
                result = tuple(h5g[n] for n in names)
                result = tuple(r[:] if len(r.shape) > 0 else r[()]
                               for r in result)

        return result


[docs]def hdf5_cache(filepath=None, parent=None, group=None, names=None, typed=False, hashed_key=False, **h5dcreate_kwargs): """HDF5 cache decorator. Parameters ---------- filepath : string, optional Path to HDF5 file. If None a temporary file name will be used. parent : string, optional Path to group within HDF5 file to use as parent. If None the root group will be used. group : string, optional Path to group within HDF5 file, relative to parent, to use as container for cached data. If None the name of the wrapped function will be used. names : sequence of strings, optional Name(s) of dataset(s). If None, default names will be 'f00', 'f01', etc. typed : bool, optional If True, arguments of different types will be cached separately. For example, f(3.0) and f(3) will be treated as distinct calls with distinct results. hashed_key : bool, optional If False (default) the key will not be hashed, which makes for readable cache group names. If True the key will be hashed, however note that on Python >= 3.3 the hash value will not be the same between sessions unless the environment variable PYTHONHASHSEED has been set to the same value. Returns ------- decorator : function Examples -------- Without any arguments, will cache using a temporary HDF5 file:: >>> import allel >>> @allel.util.hdf5_cache() ... def foo(n): ... print('executing foo') ... return np.arange(n) ... >>> foo(3) executing foo array([0, 1, 2]) >>> foo(3) array([0, 1, 2]) >>> foo.cache_filepath # doctest: +SKIP '/tmp/tmp_jwtwgjz' Supports multiple return values, including scalars, e.g.:: >>> @allel.util.hdf5_cache() ... def bar(n): ... print('executing bar') ... a = np.arange(n) ... return a, a**2, n**2 ... >>> bar(3) executing bar (array([0, 1, 2]), array([0, 1, 4]), 9) >>> bar(3) (array([0, 1, 2]), array([0, 1, 4]), 9) Names can also be specified for the datasets, e.g.:: >>> @allel.util.hdf5_cache(names=['z', 'x', 'y']) ... def baz(n): ... print('executing baz') ... a = np.arange(n) ... return a, a**2, n**2 ... >>> baz(3) executing baz (array([0, 1, 2]), array([0, 1, 4]), 9) >>> baz(3) (array([0, 1, 2]), array([0, 1, 4]), 9) """ # initialise HDF5 file path if filepath is None: import tempfile filepath = tempfile.mktemp(prefix='scikit_allel_', suffix='.h5') atexit.register(os.remove, filepath) # initialise defaults for dataset creation h5dcreate_kwargs.setdefault('chunks', True) def decorator(user_function): # setup the name for the cache container group if group is None: container = user_function.__name__ else: container = group def wrapper(*args, **kwargs): # load from cache or not no_cache = kwargs.pop('no_cache', False) # compute a key from the function arguments key = _make_key(args, kwargs, typed) if hashed_key: key = str(hash(key)) else: key = str(key).replace('/', '__slash__') return _hdf5_cache_act(filepath, parent, container, key, names, no_cache, user_function, args, kwargs, h5dcreate_kwargs) wrapper.cache_filepath = filepath return update_wrapper(wrapper, user_function) return decorator