Source code for dit.multivariate.common_informations.base_markov_optimizer

"""
Abstract base classes
"""

from __future__ import division

from abc import abstractmethod

import numpy as np

from ...algorithms import BaseAuxVarOptimizer
from ...utils import unitful
from ..dual_total_correlation import dual_total_correlation
from ..entropy import entropy


class MarkovVarOptimizer(BaseAuxVarOptimizer):
    """
    Abstract base class for constructing auxiliary variables which render a set
    of variables conditionally independent.
    """

    name = ""
    description = ""

    def __init__(self, dist, rvs=None, crvs=None, bound=None, rv_mode=None):
        """
        Initialize the optimizer.

        Parameters
        ----------
        dist : Distribution
            The distribution to compute the auxiliary Markov variable, W, for.
        rvs : list, None
            A list of lists. Each inner list specifies the indexes of the random
            variables to render conditionally independent. If None, then all
            random variables are used, which is equivalent to passing
            `rvs=dist.rvs`.
        crvs : list, None
            A single list of indexes specifying the random variables to
            condition on. If None, then no variables are conditioned on.
        bound : int
            Place an artificial bound on the size of W.
        rv_mode : str, None
            Specifies how to interpret `rvs` and `crvs`. Valid options are:
            {'indices', 'names'}. If equal to 'indices', then the elements of
            `crvs` and `rvs` are interpreted as random variable indices. If
            equal to 'names', the the elements are interpreted as random
            variable names. If `None`, then the value of `dist._rv_mode` is
            consulted, which defaults to 'indices'.
        """
        super(MarkovVarOptimizer, self).__init__(dist, rvs=rvs, crvs=crvs, rv_mode=rv_mode)

        theoretical_bound = self.compute_bound()
        bound = min(bound, theoretical_bound) if bound else theoretical_bound

        rv_bounds = self._shape[1:-1]
        self._pmf_to_match = self._pmf.copy()

        # remove the rvs other than the first, they need to be generated by W
        # in order to satisfy the markov criteria:
        self._pmf = self._pmf.sum(axis=tuple(range(1, len(self._shape)-1)))
        self._shape = self._pmf.shape
        self._all_vars = {0, 1}

        self._full_pmf = self._full_pmf.sum(axis=tuple(range(self._n + 1, len(self._full_shape)-1)))
        self._full_shape = self._full_pmf.shape
        self._full_vars = tuple(range(self._n + 2))

        # back up where the rvs and crvs are, they need to be reflect
        # the above removals for the sake of adding auxvars:
        self.__rvs, self._rvs = self._rvs, {0}
        self.__crvs, self._crvs = self._crvs, {1}

        self._construct_auxvars([({0, 1}, bound)] +
                                [({1, 2}, s) for s in rv_bounds])

        # put rvs, crvs back:
        self._rvs = self.__rvs
        self._crvs = self.__crvs
        del self.__rvs
        del self.__crvs

        self._W = {1 + len(self._aux_vars)}

        # The constraint that the joint doesn't change.
        self.constraints += [{'type': 'eq',
                              'fun': self.constraint_match_joint,
                              },
                             ]

        self._default_hops = 5

        self._additional_options = {'options': {'maxiter': 1000,
                                                'ftol': 1e-6,
                                                'eps': 1.4901161193847656e-9,
                                                }
                                    }

    @abstractmethod
    def compute_bound(self):
        """
        Return a bound on the cardinality of the auxiliary variable.

        Returns
        -------
        bound : int
            The bound on the size of W.
        """
        pass

    def construct_joint(self, x):
        """
        Construct the joint distribution.

        Parameters
        ----------
        x : np.ndarray
            An optimization vector.

        Returns
        -------
        joint : np.ndarray
            The joint distribution resulting from the distribution passed
            in and the optimization vector.
        """
        joint = super(MarkovVarOptimizer, self).construct_joint(x)
        joint = np.moveaxis(joint, 1, -1)  # move crvs
        joint = np.moveaxis(joint, 1, -1)  # move W

        return joint

    def construct_full_joint(self, x):
        """
        Construct the joint distribution.

        Parameters
        ----------
        x : np.ndarray
            An optimization vector.

        Returns
        -------
        joint : np.ndarray
            The joint distribution resulting from the distribution passed
            in and the optimization vector.
        """
        joint = super(MarkovVarOptimizer, self).construct_full_joint(x)
        joint = np.moveaxis(joint, self._n + 1, -1)  # move crvs
        joint = np.moveaxis(joint, self._n + 1, -1)  # move W
        return joint

    def constraint_match_joint(self, x):
        """
        Ensure that the joint distribution represented by the optimization
        vector matches that of the distribution.

        Parameters
        ----------
        x : np.ndarray
            An optimization vector.
        """
        joint = self.construct_joint(x)
        joint = joint.sum(axis=-1)  # marginalize out w

        delta = (100*(joint - self._pmf_to_match)**2).sum()

        return delta

    @classmethod
    def functional(cls):
        """
        Construct a functional form of the optimizer.
        """
        @unitful
        def common_info(dist, rvs=None, crvs=None, niter=None, maxiter=1000, polish=1e-6, bound=None, rv_mode=None):
            dtc = dual_total_correlation(dist, rvs, crvs, rv_mode)
            ent = entropy(dist, rvs, crvs, rv_mode)
            if np.isclose(dtc, ent):
                # Common informations are bound between the dual total correlation and the joint
                # entropy. Therefore, if the two are equal, the common information is equal to them
                # as well.
                return dtc

            ci = cls(dist, rvs, crvs, bound, rv_mode)
            ci.optimize(niter=niter, maxiter=maxiter, polish=polish)
            return ci.objective(ci._optima)

        common_info.__doc__ = \
        """
        Computes the {name} common information, {description}.

        Parameters
        ----------
        dist : Distribution
            The distribution for which the {name} common information will be
            computed.
        rvs : list, None
            A list of lists. Each inner list specifies the indexes of the random
            variables used to calculate the {name} common information. If None,
            then it calculated over all random variables, which is equivalent to
            passing `rvs=dist.rvs`.
        crvs : list, None
            A single list of indexes specifying the random variables to condition
            on. If None, then no variables are conditioned on.
        niter : int > 0
            Number of basin hoppings to perform during the optimization.
        maxiter : int > 0
            The number of iterations of the optimization subroutine to perform.
        polish : False, float
            Whether to polish the result or not. If a float, this will perform a
            second optimization seeded with the result of the first, but with
            smaller tolerances and probabilities below polish set to 0. If
            False, don't polish.
        bound : int
            Bound the size of the Markov variable.
        rv_mode : str, None
            Specifies how to interpret `rvs` and `crvs`. Valid options are:
            {{'indices', 'names'}}. If equal to 'indices', then the elements of
            `crvs` and `rvs` are interpreted as random variable indices. If equal
            to 'names', the the elements are interpreted as random variable names.
            If `None`, then the value of `dist._rv_mode` is consulted, which
            defaults to 'indices'.

        Returns
        -------
        ci : float
            The {name} common information.
        """.format(name=cls.name, description=cls.description)

        return common_info


class MinimizingMarkovVarOptimizer(MarkovVarOptimizer):  # pragma: no cover
    """
    Abstract base class for an optimizer which additionally minimizes the size
    of the auxiliary variable.
    """

    def optimize(self, x0=None, niter=None, maxiter=None, polish=1e-6, callback=False, minimize=True, min_niter=15):
        """
        Parameters
        ----------
        x0 : np.ndarray, None
            The vector to initialize the optimization with. If None, a random
            vector is used.
        niter : int
            The number of times to basin hop in the optimization.
        maxiter : int
            The number of inner optimizer steps to perform.
        polish : False, float
            Whether to polish the result or not. If a float, this will perform a
            second optimization seeded with the result of the first, but with
            smaller tolerances and probabilities below polish set to 0. If
            False, don't polish.
        callback : bool
            Whether to utilize a callback or not.
        minimize : bool
            Whether to minimize the auxiliary variable or not.
        min_niter : int
            The number of basin hops to make during the minimization of the common variable.
        """
        # call the normal optimizer
        super(MinimizingMarkovVarOptimizer, self).optimize(x0=x0,
                                                           niter=niter,
                                                           maxiter=maxiter,
                                                           polish=False,
                                                           callback=callback)
        if minimize:
            # minimize the entropy of W
            self._post_process(style='entropy', minmax='min', niter=min_niter, maxiter=maxiter)
        if polish:
            self._polish(cutoff=polish)