#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Module defining NumPy array-based distribution classes.
One of the features of joint distributions is that we can marginalize them.
This requires that we are able to construct smaller outcomes from larger
outcomes. For example, an outcome like '10101' might become '010' if the first
and last random variables are marginalized. Given the alphabet of the joint
distribution, we can construct a tuple such as ('0','1','0') using
itertools.product, but we really want an outcome which is a string. For
tuples and other similar containers, we just pass the tuple to the outcome
constructor. This technique does not work for strings, as str(('0','1','0'))
yields "('0', '1', '0')". So we have to handle strings separately.
Note:
For dictionaries...
"k in d" being True means d[k] is a valid operation.
"k in d" being False means d[k] is a KeyError.
d[k] describes the underlying data structure.
__in__ describes the underlying data structure.
__iter__ describes the underlying data structure.
__len__ describes the underlying data structure.
For default dictionaries...
"k in d" being True means d[k] is a valid operation.
"k in d" being False means d[k] will modify the data structure to
make the operation valid. So "k in d" is True afterwards.
d[k] describes the underlying data structure.
__in__ describes the underlying data structure.
__iter__ describes the underlying data structure.
__len__ describes the underlying data structure.
For distributions...
"e in d" being True means d[e] is a valid operation.
"e in d" being False says nothing about d[e].
d[e] will be valid if e is in the sample space.
d[e] will raise an InvalidOutcome if e is not in the sample space.
d[e] does not describe the underlying data structure.
It provides a view of the dense data structure.
With defaultdict, if e not in d, then d[e] will add it.
With distributions, we don't want the pmf changing size
just because we queried it. The size will change only on
assignment.
__in__ describes the underlying data structure.
__iter__ describes the underlying data structure.
__len__ describes the underlying data structure.
For scalar distributions, the sample space is the alphabet and the alphabet
is a single set. For (joint) distributions, the sample space is provided
at initialization and the alphabet is a tuple of alphabets for each random
variable. The alphabet for each random variable is a tuple.
As of now, dit does not support mixed-type alphabets within a single r.v.
So you can have outcomes like:
(0, '0'), (1, '1')
but not like:
(0, '0'), (1, 1)
This has to do with sorting the alphabets. Probably this can be relaxed.
"""
from collections import defaultdict
from operator import itemgetter
import itertools
import numpy as np
from six.moves import map, range, zip # pylint: disable=redefined-builtin
from .npscalardist import ScalarDistribution
from .helpers import (
construct_alphabets,
get_outcome_ctor,
get_product_func,
parse_rvs,
reorder,
RV_MODES,
)
from .samplespace import SampleSpace, CartesianProduct
from .exceptions import (
InvalidDistribution, InvalidOutcome, ditException
)
from .math import get_ops, LinearOperations
from .params import ditParams
def _make_distribution(outcomes, pmf, base,
sample_space=None, prng=None, sparse=True):
"""
An unsafe, but faster, initialization for distributions.
If used incorrectly, the data structure will be inconsistent.
This function can be useful when you are creating many distributions
in a loop and can guarantee that:
0) all outcomes are of the same type (eg tuple, str) and length.
1) the sample space is in the desired order.
1) outcomes and pmf are in the same order as the sample space.
[Thus, `pmf` should not be a dictionary.]
This function will not order the sample space, nor will it reorder outcomes
or pmf. It will not forcibly make outcomes and pmf to be sparse or dense.
It will simply declare the distribution to be sparse or dense. The
distribution is not validated either.
Returns
-------
d : Distribution
The new distribution.
"""
d = Distribution.__new__(Distribution)
# Call init function of BaseDistribution, not of Distribution.
# This sets the prng.
super(ScalarDistribution, d).__init__(prng)
d._meta['is_joint'] = True
d._meta['is_numerical'] = True
d._meta['is_sparse'] = None
if base is None:
# Assume default base.
base = ditParams['base']
d.ops = get_ops(base)
## Set the outcome class, ctor, and product function.
## Assumption: the class of each outcome is the same.
klass = outcomes[0].__class__
d._outcome_class = klass
d._outcome_ctor = get_outcome_ctor(klass)
d._product = get_product_func(klass)
# Force the distribution to be numerical and a NumPy array.
d.pmf = np.asarray(pmf, dtype=float)
# Tuple outcomes, and an index.
d.outcomes = tuple(outcomes)
d._outcomes_index = dict(zip(outcomes, range(len(outcomes))))
# Alphabet
d.alphabet = tuple(construct_alphabets(outcomes))
# Sample space.
if sample_space is None:
d._sample_space = CartesianProduct(d.alphabet, d._product)
elif isinstance(sample_space, SampleSpace):
d._sample_space = sample_space
else:
d._sample_space = SampleSpace(outcomes)
# Set the mask
d._mask = d._new_mask()
d._meta['is_sparse'] = sparse
d.rvs = [[i] for i in range(d.outcome_length())]
return d
class Distribution(ScalarDistribution):
"""
A numerical distribution for joint random variables.
Meta Properties
---------------
is_joint
Boolean specifying if the pmf represents a joint distribution.
is_numerical
Boolean specifying if the pmf represents numerical values or not.
The values could be symbolic, for example.
is_sparse : bool
`True` if `outcomes` and `pmf` represent a sparse distribution.
Private Attributes
------------------
_mask : tuple
A tuple of booleans specifying if the corresponding random variable
has been masked or not.
_meta : dict
A dictionary containing the meta information, described above.
_outcome_class : class
The class of all outcomes in the distribution.
_outcome_ctor : callable
A callable responsible for converting tuples to outcomes.
_outcomes_index : dict
A dictionary mapping outcomes to their index in self.outcomes.
_product : function
A specialized product function, similar to itertools.product. The
primary difference is that instead of yielding tuples, this product
function will yield objects which are of the same type as the outcomes.
_rvs : dict
A dictionary mapping random variable names to their index into the
outcomes of the distribution.
_sample_space : SampleSpace
The sample space of the distribution.
Public Attributes
-----------------
alphabet : tuple
A tuple representing the alphabet of the joint random variable. The
elements of the tuple are tuples, each of which represents the ordered
alphabet for a single random variable.
outcomes : tuple
The outcomes of the probability distribution.
ops : Operations instance
A class which manages addition and multiplication operations for log
and linear probabilities.
pmf : array-like
The probability mass function for the distribution. The elements of
this array are in a one-to-one correspondence with those in `outcomes`.
prng : RandomState
A pseudo-random number generator with a `rand` method which can
generate random numbers. For now, this is assumed to be something
with an API compatibile to NumPy's RandomState class. This attribute
is initialized to equal dit.math.prng.
Public Methods
--------------
from_distribution
Alternative constructor from an existing distribution.
atoms
Returns the atoms of the probability space.
coalesce
Returns a new joint distribution after coalescing random variables.
copy
Returns a deep copy of the distribution.
outcome_length
Returns the length of the outcomes in the distribution.
sample_space
Returns an iterator over the outcomes in the sample space.
get_base
Returns the base of the distribution.
get_rv_names
Returns the names of the random variables.
has_outcome
Returns `True` is the distribution has `outcome` in the sample space.
is_dense
Returns `True` if the distribution is dense.
is_homogeneous
Returns `True` if the alphabet for each random variable is the same.
is_joint
Returns `True` if the distribution is a joint distribution.
is_log
Returns `True` if the distribution values are log probabilities.
is_numerical
Returns `True` if the distribution values are numerical.
is_sparse
Returns `True` if the distribution is sparse.
marginal
Returns a marginal distribution of the specified random variables.
marginalize
Returns a marginal distribution after marginalizing random variables.
make_dense
Add all null outcomes to the pmf.
make_sparse
Remove all null outcomes from the pmf.
normalize
Normalizes the distribution.
sample
Returns a sample from the distribution.
set_base
Changes the base of the distribution, in-place.
set_rv_names
Sets the names of the random variables.
to_string
Returns a string representation of the distribution.
validate
A method to validate that the distribution is valid.
zipped
Returns an iterator over (outcome, probability) tuples. The probability
could be a log probability or a linear probability.
Implementation Notes
--------------------
The outcomes and pmf of the distribution are stored as a tuple and a NumPy
array, respectively. The sequences can both be sparse or dense. By sparse,
we do not mean that the representation is a NumPy sparse array. Rather,
we mean that the sequences need not contain every outcome in the sample
space. The order of the outcomes and probabilities will always match the
order of the sample space, even though their length might not equal the
length of the sample space.
"""
## Unadvertised attributes
_sample_space = None
_mask = None
_meta = None
_outcome_class = None
_outcome_ctor = None
_outcomes_index = None
_product = None
_rvs = None
_rv_mode = 'indices'
## Advertised attributes.
alphabet = None
outcomes = None
ops = None
pmf = None
prng = None
[docs] def __init__(self, outcomes, pmf=None, sample_space=None, base=None,
prng=None, sort=True, sparse=True, trim=True,
validate=True):
"""
Initialize the distribution.
Parameters
----------
outcomes : sequence, dict
The outcomes of the distribution. If `outcomes` is a dictionary,
then the keys are used as `outcomes`, and the values of
the dictionary are used as `pmf` instead. The values will not be
used if probabilities are passed in via `pmf`. Outcomes must be
hashable, orderable, sized, iterable containers. The length of an
outcome must be the same for all outcomes, and every outcome must
be of the same type.
pmf : sequence, None
The outcome probabilities or log probabilities. `pmf` can be None
only if `outcomes` is a dict.
sample_space : sequence, CartesianProduct
A sequence representing the sample space, and corresponding to the
complete set of possible outcomes. The order of the sample space
is important. If `None`, then the outcomes are used to determine
a Cartesian product sample space instead.
base : float, str, None
If `pmf` specifies log probabilities, then `base` should specify
the base of the logarithm. If 'linear', then `pmf` is assumed to
represent linear probabilities. If `None`, then the value for
`base` is taken from ditParams['base'].
prng : RandomState
A pseudo-random number generator with a `rand` method which can
generate random numbers. For now, this is assumed to be something
with an API compatibile to NumPy's RandomState class. This attribute
is initialized to equal dit.math.prng.
sort : bool
If `True`, then each random variable's alphabets are sorted before
they are finalized. Usually, this is desirable, as it normalizes
the behavior of distributions which have the same sample spaces
(when considered as a set). Note that addition and multiplication
of distributions is defined only if the sample spaces are
compatible.
sparse : bool
Specifies the form of the pmf. If `True`, then `outcomes` and `pmf`
will only contain entries for non-null outcomes and probabilities,
after initialization. The order of these entries will always obey
the order of `sample_space`, even if their number is not equal to
the size of the sample space. If `False`, then the pmf will be
dense and every outcome in the sample space will be represented.
trim : bool
Specifies if null-outcomes should be removed from pmf when
`make_sparse()` is called (assuming `sparse` is `True`) during
initialization.
validate : bool
If `True`, then validate the distribution. If `False`, then assume
the distribution is valid, and perform no checks.
Raises
------
InvalidDistribution
If the length of `values` and `outcomes` are unequal.
If no outcomes can be obtained from `pmf` and `outcomes` is `None`.
See :meth:`validate` for a list of other potential exceptions.
"""
# Note, we are not calling ScalarDistribution.__init__
# Instead, we want to call BaseDistribution.__init__.
# And BaseDistribution is the parent of ScalarDistribution.
# We do this because we want to init the prng AND ignore everything
# that ScalarDistribution does.
super(ScalarDistribution, self).__init__(prng) # pylint: disable=bad-super-call
# Set *instance* attributes
self._meta['is_joint'] = True
self._meta['is_numerical'] = True
self._meta['is_sparse'] = None
# Do any checks/conversions necessary to get the parameters.
outcomes, pmf = self._init(outcomes, pmf, base)
if len(outcomes) == 0 and sample_space is None:
msg = '`outcomes` must be nonempty if no sample space is given'
raise InvalidDistribution(msg)
if isinstance(sample_space, SampleSpace):
if not sample_space._meta['is_joint']:
msg = '`sample_space` must be a joint sample space.'
raise InvalidDistribution(msg)
if sort:
sample_space.sort()
self._outcome_class = sample_space._outcome_class
self._outcome_ctor = sample_space._outcome_ctor
self._product = sample_space._product
self._sample_space = sample_space
if isinstance(sample_space, CartesianProduct):
alphabets = sample_space.alphabets
else:
alphabets = construct_alphabets(sample_space._samplespace)
else:
if sample_space is None:
ss = outcomes
else:
ss = sample_space
alphabets = construct_alphabets(ss)
if sort:
alphabets = tuple(map(tuple, map(sorted, alphabets)))
## Set the outcome class, ctor, and product function.
## Assumption: the class of each outcome is the same.
klass = ss[0].__class__
self._outcome_class = klass
self._outcome_ctor = get_outcome_ctor(klass)
self._product = get_product_func(klass)
if sample_space is None:
self._sample_space = CartesianProduct(alphabets, self._product)
else:
self._sample_space = SampleSpace(ss)
# Sort everything to match the order of the sample space.
## Question: Using sort=False seems very strange and supporting it
## makes things harder, since we can't assume the outcomes
## and sample space are sorted. Is there a valid use case
## for an unsorted sample space?
if sort and len(outcomes) > 0:
outcomes, pmf, index = reorder(outcomes, pmf, self._sample_space)
else:
index = dict(zip(outcomes, range(len(outcomes))))
# Force the distribution to be numerical and a NumPy array.
self.pmf = np.asarray(pmf, dtype=float)
# Tuple outcomes, and an index.
self.outcomes = tuple(outcomes)
self._outcomes_index = index
self.alphabet = tuple(alphabets)
self.rvs = [[i] for i in range(self.outcome_length())]
# Mask
self._mask = self._new_mask()
if sparse:
self.make_sparse(trim=trim)
else:
self.make_dense()
if validate:
self.validate()
def _init(self, outcomes, pmf, base):
"""
Pre-initialization with various sanity checks.
"""
# Note: We've changed the behavior of _init here.
# In ScalarDistribution it returns a 3-tuple. Here, a 2-tuple.
# Attempt to grab outcomes and pmf from a dictionary
try:
outcomes_ = tuple(outcomes.keys())
pmf_ = tuple(outcomes.values())
except AttributeError:
pass
else:
outcomes = outcomes_
if pmf is not None:
msg = '`pmf` must be `None` if `outcomes` is a dict.'
raise InvalidDistribution(msg)
pmf = pmf_
if pmf is None:
msg = '`pmf` was `None` but `outcomes` was not a dict.'
raise InvalidDistribution(msg)
# Make sure pmf and outcomes are sequences
try:
len(outcomes)
len(pmf)
except TypeError:
raise TypeError('`outcomes` and `pmf` must be sequences.')
if len(pmf) != len(outcomes):
msg = "Unequal lengths for `pmf` and `outcomes`"
raise InvalidDistribution(msg)
# reorder() and other functions require that outcomes be indexable. So
# we make sure it is. We must check for zero length outcomes since, in
# principle, you can initialize with a 0-length `pmf` and `outcomes`.
if len(outcomes):
try:
outcomes[0]
except TypeError:
raise ditException('`outcomes` must be indexable.')
# Determine if the pmf represents log probabilities or not.
if base is None:
# Provide help for obvious case of linear probabilities.
from .validate import is_pmf
if is_pmf(np.asarray(pmf, dtype=float), LinearOperations()):
base = 'linear'
else:
base = ditParams['base']
self.ops = get_ops(base)
return outcomes, pmf
def _new_mask(self, from_mask=None, complement=None):
"""
Creates a new mask for the distribution.
Parameters
----------
from_mask : iter | None
Create a mask from an existing mask. If ``None``, then a mask
will be created which is ``False`` for each random variable.
complement : bool
If ``True``, invert the mask that would have been built.
This includes inverting the mask when ``from_mask=None``.
Returns
-------
mask : tuple
The newly created mask.
"""
if from_mask is None:
L = self.outcome_length(masked=False)
mask = [False for _ in range(L)]
else:
mask = [bool(b) for b in from_mask]
if complement:
mask = [not b for b in mask]
mask = tuple(mask)
self._mask = mask
return mask
@classmethod
def from_distribution(cls, dist, base=None, prng=None):
"""
Returns a new Distribution from an existing distribution.
Parameters
----------
dist : Distribution, ScalarDistribution
The existing distribution
base : 'linear', 'e', or float
Optionally, change the base of the new distribution. If `None`,
then the new distribution will have the same base as the existing
distribution.
prng : RandomState
A pseudo-random number generator with a `rand` method which can
generate random numbers. For now, this is assumed to be something
with an API compatible to NumPy's RandomState class. If `None`,
then we initialize to dit.math.prng. Importantly, we do not
copy the prng of the existing distribution. For that, see copy().
Returns
-------
d : Distribution
The new distribution.
"""
if dist.is_joint():
if not isinstance(dist, ScalarDistribution):
raise NotImplementedError
else:
# Assume it is a Distribution.
# Easiest way is to just copy it and then override the prng.
d = dist.copy(base=base)
else:
if not isinstance(dist, ScalarDistribution):
raise NotImplementedError
else:
# Assume it is a ScalarDistribution
from .convert import SDtoD
d = SDtoD(dist)
if base is not None:
d.set_base(base)
if prng is None:
# Do not use copied prng.
d.prng = np.random.RandomState()
else:
# Use specified prng.
d.prng = prng
return d
@classmethod
def from_ndarray(cls, ndarray, base=None, prng=None):
"""
Construct a Distribution from a pmf stored as an ndarray.
Parameters
----------
ndarray : np.ndarray
pmf in the form of an ndarray, where each axis is a variable and
the index along that axis is the variable's value.
base : 'linear', 'e', or float
Optionally, specify the base of the new distribution. If `None`,
then the new distribution will be assumed to have a linear
distribution.
prng : RandomState
A pseudo-random number generator with a `rand` method which can
generate random numbers. For now, this is assumed to be something
with an API compatible to NumPy's RandomState class. If `None`,
then we initialize to dit.math.prng.
Returns
-------
d : Distribution
The distribution resulting from interpreting `ndarray` as a pmf.
"""
return cls(*zip(*np.ndenumerate(ndarray)), base=base, prng=prng)
@property
def DataArray(self):
"""
Convert the distribution to a DataArray.
"""
import xarray as xr
alphabet = self.alphabet
dist = self.copy()
dist.make_dense()
sizes = [len(a) for a in alphabet]
pmf = dist.pmf.reshape(*sizes)
dims = dist.get_rv_names()
if dims is None:
dims = ['x[{}]'.format(i) for i in range(dist.outcome_length())]
coords = {d: sorted(a) for d, a in zip(dims, alphabet)}
return xr.DataArray(pmf, dims=dims, coords=coords)
@classmethod
def from_DataArray(cls, da, base=None, prng=None):
"""
Construct a Distribution from a pmf stored as a DataArray. Dimension
names and coordinates are extracted if available.
Parameters
----------
da : xr.DataArray
pmf in the form of a DataArray, where each axis is a variable and
the index along that axis is the variable's value.
base : 'linear', 'e', or float
Optionally, specify the base of the new distribution. If `None`,
then the new distribution will be assumed to have a linear
distribution.
prng : RandomState
A pseudo-random number generator with a `rand` method which can
generate random numbers. For now, this is assumed to be something
with an API compatible to NumPy's RandomState class. If `None`,
then we initialize to dit.math.prng.
Returns
-------
d : Distribution
The distribution resulting from interpreting `da` as a pmf.
"""
rv_names = da.dims
events, pmf = zip(*np.ndenumerate(da))
events = [tuple([da.coords[rv][_].values.flatten()[0] for _, rv in zip(e, rv_names)]) for e in events]
dist = Distribution(events, pmf, base=base, prng=prng)
dist.set_rv_names(rv_names)
return dist
@classmethod
def from_rv_discrete(cls, ssrv, prng=None):
"""
Create a Distribution from a scipy.states.rv_discrete instance.
Parameters
----------
ssrv : scipy.stats.rv_discrete
The random variable to convert to a dit.Distribution.
prng : RandomState
A pseudo-random number generator with a `rand` method which can
generate random numbers. For now, this is assumed to be something
with an API compatibile to NumPy's RandomState class. If `None`,
then we initialize to dit.math.prng.
Returns
-------
d : Distribution
A Distribution representation of `ssrv`.
"""
sd = ScalarDistribution.from_rv_discrete(ssrv=ssrv, prng=prng)
return cls.from_distribution(sd)
def __setitem__(self, outcome, value):
"""
Sets the probability associated with `outcome`.
Parameters
----------
outcome : outcome
Any hashable and equality comparable object in the sample space.
If `outcome` does not exist in the sample space, then an
InvalidOutcome exception is raised.
value : float
The probability or log probability of the outcome.
Returns
-------
p : float
The probability (or log probability) of the outcome.
Raises
------
InvalidOutcome
If `outcome` does not exist in the sample space.
Notes
-----
Setting the value of the outcome never deletes the outcome, even if the
value is equal to the null probabilty. After a setting operation,
the outcome will always exist in `outcomes` and `pmf`.
See Also
--------
__delitem__
"""
if not self.has_outcome(outcome, null=True):
# Then, the outcome is not in the sample space.
raise InvalidOutcome(outcome)
idx = self._outcomes_index.get(outcome, None)
new_outcome = idx is None
if not new_outcome:
# If the distribution is dense, we will always be here.
# If the distribution is sparse, then we are here for an existing
# outcome. In the sparse case, we *could* delete the outcome
# if the value was zero, but we have choosen to let setting always
# "set" and deleting always "delete".
self.pmf[idx] = value
else:
# Thus, the outcome is new in a sparse distribution. Even if the
# value is zero, we still set the value and add it to pmf.
# 1. Add the new outcome and probability
self.outcomes = self.outcomes + (outcome,)
self._outcomes_index[outcome] = len(self.outcomes) - 1
pmf = [p for p in self.pmf] + [value]
# 2. Reorder ### This call is different from Distribution
outcomes, pmf, index = reorder(self.outcomes, pmf,
self._sample_space)
# 3. Store
self.outcomes = tuple(outcomes)
self._outcomes_index = index
self.pmf = np.array(pmf, dtype=float)
def _validate_outcomes(self):
"""
Returns `True` if the outcomes are valid.
Valid means each outcome is in the sample space (and thus of the
proper class and proper length) and also that the outcome class
supports the Sequence idiom.
Returns
-------
v : bool
`True` if the outcomes are valid.
Raises
------
InvalidOutcome
When an outcome is not in the sample space.
"""
from .validate import validate_sequence
v = super(Distribution, self)._validate_outcomes()
# If we survived, then all outcomes have the same class.
# Now, we just need to make sure that class is a sequence.
v &= validate_sequence(self.outcomes[0])
return v
def coalesce(self, rvs, rv_mode=None, extract=False):
"""
Returns a new joint distribution after coalescing random variables.
Given n lists of random variables in the original joint distribution,
the coalesced distribution is a joint distribution over n random
variables. Each random variable is a coalescing of random variables
in the original joint distribution.
Parameters
----------
rvs : sequence
A sequence whose elements are also sequences. Each inner sequence
defines a random variable in the new distribution as a combination
of random variables in the original distribution. The length of
`rvs` must be at least one. The inner sequences need not be
pairwise mutually exclusive with one another, and each can contain
repeated random variables.
rv_mode : str, None
Specifies how to interpret the elements of `rvs`. Valid options
are: {'indices', 'names'}. If equal to 'indices', then the elements
of `rvs` are interpreted as random variable indices. If equal to
'names', the the elements are interpreted as random variable names.
If `None`, then the value of `dist._rv_mode` is consulted.
extract : bool
If the length of `rvs` is 1 and `extract` is `True`, then instead
of the new outcomes being 1-tuples, we extract the sole element to
create a joint distribution over the random variables in `rvs[0]`.
Returns
-------
d : distribution
The coalesced distribution.
Examples
--------
If we have a joint distribution ``d`` over 3 random variables such as:
A = (X,Y,Z)
and would like a new joint distribution over 6 random variables:
B = (X,Y,Z,X,Y,Z)
then this is achieved as:
>>> B = d.coalesce([[0,1,2,0,1,2]], extract=True)
If you want:
B = ((X,Y), (Y,Z))
Then you do:
>>> B = d.coalesce([[0,1],[1,2]])
Notes
-----
Generally, the outcomes of the new distribution will be tuples instead
of matching the outcome class of the original distribution. This is
because some outcome classes are not recursive containers. For example,
one cannot have a string of strings where each string consists of more
than one character. Note however, that it is perfectly valid to have
a tuple of tuples. Either way, the elements within each tuple of the
new distribution will still match the outcome class of the original
distribution.
See Also
--------
marginal, marginalize
"""
from array import array
# We allow repeats and want to keep the order. We don't need the names.
parse = lambda rv: parse_rvs(self, rv, rv_mode=rv_mode,
unique=False, sort=False)[1]
indexes = [parse(rv) for rv in rvs]
# Determine how new outcomes are constructed.
if len(rvs) == 1 and extract:
ctor_o = lambda x: x[0]
else:
ctor_o = tuple
if extract:
raise Exception('Cannot extract with more than one rv.')
# Determine how elements of new outcomes are constructed.
ctor_i = self._outcome_ctor
# Build the distribution.
factory = lambda: array('d')
d = defaultdict(factory)
for outcome, p in self.zipped():
# Build a list of inner outcomes. "c" stands for "constructed".
c_outcome = [ctor_i([outcome[i] for i in rv]) for rv in indexes]
# Build the outer outcome from the inner outcomes.
c_outcome = ctor_o(c_outcome)
d[c_outcome].append(p)
outcomes = tuple(d.keys())
pmf = map(np.frombuffer, d.values())
pmf = map(self.ops.add_reduce, pmf)
pmf = tuple(pmf)
# Preserve the sample space during coalescing.
sample_spaces = [self._sample_space.coalesce([idxes], extract=True)
for idxes in indexes]
if isinstance(self._sample_space, CartesianProduct):
sample_space = CartesianProduct(sample_spaces,
product=itertools.product)
if extract:
sample_space = sample_space.alphabets[0]
else:
if extract:
# There is only one sample space: len(indexes) = 1
sample_space = sample_spaces[0]
else:
sample_space = list(zip(*sample_spaces))
d = Distribution(outcomes, pmf,
base=self.get_base(),
sort=True,
sample_space=sample_space,
sparse=self.is_sparse(),
validate=False)
# We do not set the rv names, since these are new random variables.
# Set the mask
L = len(indexes)
d._mask = tuple(False for _ in range(L))
return d
[docs] def condition_on(self, crvs, rvs=None, rv_mode=None, extract=False):
"""
Returns distributions conditioned on random variables ``crvs``.
Optionally, ``rvs`` specifies which random variables should remain.
NOTE: Eventually this will return a conditional distribution.
Parameters
----------
crvs : list
The random variables to condition on.
rvs : list, None
The random variables for the resulting conditional distributions.
Any random variable not represented in the union of ``crvs`` and
``rvs`` will be marginalized. If ``None``, then every random
variable not appearing in ``crvs`` is used.
rv_mode : str, None
Specifies how to interpret ``crvs`` and ``rvs``. Valid options are:
{'indices', 'names'}. If equal to 'indices', then the elements
of ``crvs`` and ``rvs`` are interpreted as random variable indices.
If equal to 'names', the the elements are interpreted as random
varible names. If ``None``, then the value of ``self._rv_mode`` is
consulted, which defaults to 'indices'.
extract : bool
If the length of either ``crvs`` or ``rvs`` is 1 and ``extract`` is
``True``, then instead of the new outcomes being 1-tuples, we
extract the sole element to create scalar distributions.
Returns
-------
cdist : dist
The distribution of the conditioned random variables.
dists : list of distributions
The conditional distributions for each outcome in ``cdist``.
Examples
--------
First we build a distribution P(X,Y,Z) representing the XOR logic gate.
>>> pXYZ = dit.example_dists.Xor()
>>> pXYZ.set_rv_names('XYZ')
We can obtain the conditional distributions P(X,Z|Y) and the marginal
of the conditioned variable P(Y) as follows::
>>> pY, pXZgY = pXYZ.condition_on('Y')
If we specify ``rvs='Z'``, then only 'Z' is kept and thus, 'X' is
marginalized out::
>>> pY, pZgY = pXYZ.condition_on('Y', rvs='Z')
We can condition on two random variables::
>>> pXY, pZgXY = pXYZ.condition_on('XY')
The equivalent call using indexes is:
>>> pXY, pZgXY = pXYZ.condition_on([0, 1], rv_mode='indexes')
"""
crvs, cindexes = parse_rvs(self, crvs, rv_mode, unique=True, sort=True)
if rvs is None:
indexes = set(range(self.outcome_length())) - set(cindexes)
else:
rvs, indexes = parse_rvs(self, rvs, rv_mode, unique=True, sort=True)
union = set(cindexes).union(indexes)
if len(union) != len(cindexes) + len(indexes):
raise ditException('`crvs` and `rvs` must have no intersection.')
# Marginalize the random variables not in crvs or rvs
if len(union) < self.outcome_length():
mapping = dict(zip(sorted(union), range(len(union))))
d = self.marginal(union, rv_mode=RV_MODES.INDICES)
# Now we need to shift the indices to their new index values.
cindexes = [mapping[idx] for idx in cindexes]
indexes = [mapping[idx] for idx in indexes]
else:
# Make a copy so we don't have to worry about changing the input
# distribution when we make it sparse.
d = self.copy()
# It's just easier to not worry about conditioning on zero probs.
sparse = d.is_sparse()
d.make_sparse()
# Note that any previous mask of d from the marginalization will be
# ignored when we take new marginals. This is desirable here.
cdist = d.marginal(cindexes, rv_mode=RV_MODES.INDICES)
dist = d.marginal(indexes, rv_mode=RV_MODES.INDICES)
sample_space = dist._sample_space
rv_names = dist.get_rv_names()
ops = d.ops
base = ops.get_base()
ctor = d._outcome_ctor
# A list of indexes of conditioned outcomes for each joint outcome.
# These are the indexes of w in the pmf of P(w) for each ws in P(ws).
cidx = cdist._outcomes_index
coutcomes = [cidx[ctor([o[i] for i in cindexes])] for o in d.outcomes]
# A list of indexes of outcomes for each joint outcome.
# These are the indexes of s in the pmf of P(s) for each ws in P(ws).
idx = dist._outcomes_index
outcomes = [idx[ctor([o[i] for i in indexes])] for o in d.outcomes]
cprobs = np.array([ops.invert(cdist.pmf[i]) for i in coutcomes])
probs = ops.mult(d.pmf, cprobs)
# Now build the distributions
pmfs = np.empty((len(cdist), len(dist)), dtype=float)
pmfs.fill(ops.zero)
for i, (coutcome, outcome) in enumerate(zip(coutcomes, outcomes)):
pmfs[coutcome, outcome] = probs[i]
dists = [Distribution(dist.outcomes, pmfs[i], sparse=sparse,
base=base, sample_space=sample_space, validate=False)
for i in range(pmfs.shape[0])]
# Set the masks and r.v. names for each conditional distribution.
for dd in dists:
dd._new_mask(from_mask=dist._mask)
dd.set_rv_names(rv_names)
if extract:
if len(cindexes) == 1:
cdist = ScalarDistribution.from_distribution(cdist)
if len(indexes) == 1:
dists = [ScalarDistribution.from_distribution(d) for d in dists]
return cdist, dists
def copy(self, base=None):
"""
Returns a (deep) copy of the distribution.
Parameters
----------
base : 'linear', 'e', or float
Optionally, copy and change the base of the copied distribution.
If `None`, then the copy will keep the same base.
"""
from copy import deepcopy
# Make an exact copy of the PRNG.
prng = np.random.RandomState()
prng.set_state(self.prng.get_state())
d = _make_distribution(outcomes=deepcopy(self.outcomes),
pmf=np.array(self.pmf, copy=True),
base=self.ops.base,
sample_space=deepcopy(self._sample_space),
prng=prng,
sparse=self._meta['is_sparse'])
if base is not None:
d.set_base(base)
# The following are not initialize-able from the constructor.
d.set_rv_names(self.get_rv_names())
d._mask = tuple(self._mask)
return d
def outcome_length(self, masked=False):
"""
Returns the length of outcomes in the joint distribution.
This is also equal to the number of random variables in the joint
distribution. This value is fixed once the distribution is initialized.
Parameters
----------
masked : bool
If `True`, then the outcome length additionally includes masked
random variables. If `False`, then the outcome length does not
include masked random variables. Including the masked random
variables is not usually helpful since that represents the outcome
length of a different, unmarginalized distribution.
"""
if masked:
return len(self._mask)
else:
# Equivalently: sum(self._mask)
# Equivalently: len(self.outcomes[0])
# Recall, self.alphabet contains only the unmasked/valid rvs.
return len(self.alphabet)
def get_rv_names(self):
"""
Returns the names of the random variables.
Returns
-------
rv_names : tuple or None
A tuple with length equal to the outcome length, containing the
names of the random variables in the distribution. If no random
variable names have been set, then None is returned.
"""
if self._rvs is None:
rv_names = None
else:
# _rvs is a dict mapping random variable names to indexes.
rv_names = [x for x in self._rvs.items()]
# Sort by index.
rv_names.sort(key=itemgetter(1))
# Keep only the sorted names.
rv_names = tuple(map(itemgetter(0), rv_names))
return rv_names
def has_outcome(self, outcome, null=True):
"""
Returns `True` if `outcome` exists in the sample space.
Whether or not an outcome is in the sample space is a separate question
from whether or not an outcome currently appears in the pmf.
See __contains__ for this latter question.
Parameters
----------
outcome : outcome
The outcome to be tested.
null : bool
Specifies if null outcomes are acceptable. If `True`, then null
outcomes are acceptable. Thus, the only requirement on `outcome`
is that it exist in the distribution's sample space. If `False`,
then null outcomes are not acceptable. Thus, `outcome` must exist
in the distribution's sample space and be a nonnull outcome.
Notes
-----
This is an O( len(outcome) ) operation.
"""
# Make sure the outcome exists in the sample space.
is_atom = outcome in self._sample_space
if not is_atom:
# Outcome does not exist in the sample space.
return False
elif null:
# Outcome exists in the sample space and we don't care about
# whether it represents a null probability.
return True
else:
idx = self._outcomes_index.get(outcome, None)
if idx is None:
# Outcome is not represented in pmf and thus, represents
# a null probability.
return False
else:
# Outcome is in pmf. We still need to test if it represents
# a null probability.
return self.pmf[idx] > self.ops.zero
def is_homogeneous(self):
"""
Returns `True` if the alphabet for each random variable is the same.
"""
if len(self.alphabet) == 0:
# Degenerate case: No random variables, no alphabet.
return True
a1 = self.alphabet[0]
try:
h = all(a2 == a1 for a2 in self.alphabet[1:])
except ValueError:
try:
h = all(np.equal(a1, a2).all() for a2 in self.alphabet[1:])
h &= all(len(a1) == len(a2) for a2 in self.alphabet[1:])
except ValueError:
return False
return h
[docs] def marginal(self, rvs, rv_mode=None):
"""
Returns a marginal distribution.
Parameters
----------
rvs : list
The random variables to keep. All others are marginalized.
rv_mode : str, None
Specifies how to interpret the elements of `rvs`. Valid options
are: {'indices', 'names'}. If equal to 'indices', then the elements
of `rvs` are interpreted as random variable indices. If equal to
'names', the the elements are interpreted as random variable names.
If `None`, then the value of `self._rv_mode` is consulted.
Returns
-------
d : joint distribution
A new joint distribution with the random variables in `rvs`
kept and all others marginalized.
"""
# For marginals, we must have unique indexes. Additionally, we do
# not allow the order of the random variables to change. So we sort.
# We parse the rv_mode now, so that we can reassign their names
# after coalesce has finished.
rvs, indexes = parse_rvs(self, rvs, rv_mode, unique=True, sort=True)
## Eventually, add in a method specialized for dense distributions.
## This one would work only with the pmf, and not the outcomes.
# Marginalization is a special case of coalescing where there is only
# one new random variable and it is composed of a strict subset of
# the original random variables, with no duplicates, that maintains
# the order of the original random variables.
d = self.coalesce([indexes], rv_mode=RV_MODES.INDICES, extract=True)
# Handle parts of d that are not settable through initialization.
# Set the random variable names
if self._rvs is None:
# There are no names...
names = None
else:
# We only have the indexes...so reverse lookup to get the names.
names_, indexes_ = self._rvs.keys(), self._rvs.values()
rev = dict(zip(indexes_, names_))
names = [rev[i] for i in indexes]
d.set_rv_names(names)
# Set the mask
L = self.outcome_length()
d._mask = tuple(False if i in indexes else True for i in range(L))
return d
[docs] def marginalize(self, rvs, rv_mode=None):
"""
Returns a new distribution after marginalizing random variables.
Parameters
----------
rvs : list
The random variables to marginalize. All others are kept.
rv_mode : str, None
Specifies how to interpret the elements of `rvs`. Valid options
are: {'indices', 'names'}. If equal to 'indices', then the elements
of `rvs` are interpreted as random variable indices. If equal to
'names', the the elements are interpreted as random variable names.
If `None`, then the value of `self._rv_mode` is consulted.
Returns
-------
d : joint distribution
A new joint distribution with the random variables in `rvs`
marginalized and all others kept.
"""
rvs, indexes = parse_rvs(self, rvs, rv_mode)
indexes = set(indexes)
all_indexes = range(self.outcome_length())
marginal_indexes = [i for i in all_indexes if i not in indexes]
d = self.marginal(marginal_indexes, rv_mode=RV_MODES.INDICES)
return d
def set_rv_names(self, rv_names):
"""
Sets the names of the random variables.
Returns
-------
rv_names : tuple
A tuple with length equal to the outcome length, containing the
names of the random variables in the distribution.
"""
if rv_names is None:
# This is an explicit clearing of the rv names.
rvs = None
else:
L = self.outcome_length()
if len(set(rv_names)) < L:
raise ditException('Too few unique random variable names.')
elif len(set(rv_names)) > L:
raise ditException('Too many unique random variable names.')
if L > 0:
rvs = dict(zip(rv_names, range(L)))
else:
# This is a corner case of a distribution with 0 rvs.
# We keep rvs equal to None, instead of an empty dict.
rvs = None
self._rvs = rvs
if self._rvs is not None:
# Unsure if we should change this automatically.
self._rv_mode = 'names'
def to_html(self, digits=None, exact=None, tol=1e-9): # pragma: no cover
"""
Construct an HTML representation of the distribution.
Returns
-------
output : str
An HTML version of this distribution.
"""
from .distribution import prepare_string
if exact is None:
exact = ditParams['print.exact']
x = prepare_string(self, digits, exact, tol)
pmf, outcomes, base, colsep, max_length, pstr = x
# Alphabet
if len(self.alphabet) == 0:
alpha = "()"
elif self.is_homogeneous():
alpha = str(self.alphabet[0]) + " for all rvs"
else:
alpha = str(self.alphabet)
# Outcome class
outcome_class = self._outcome_class
if outcome_class is not None:
outcome_class = outcome_class.__name__
info = [
("Class", self.__class__.__name__),
("Alphabet", alpha),
("Base", base),
("Outcome Class", outcome_class),
("Outcome Lenght", self.outcome_length()),
]
infos = ''.join("<tr><th>{}:</th><td>{}</td></tr>".format(a, b) for a, b in info)
header = '<table border="1">{}</table>'.format(infos)
rv_names = self.get_rv_names()
if rv_names is None:
rv_names = ["x[{}]".format(i) for i in range(self.outcome_length())]
table_header = '<tr>' + ''.join("<th>{}</th>".format(a) for a in rv_names) + "<th>{}</th></tr>".format(pstr)
table_rows = ''.join(
'<tr>' + ''.join('<td>{}</td>'.format(str(_)) for _ in o) + '<td>{}</td></tr>'.format(p) for o, p in zip(self.outcomes, pmf))
table = '<table>{}{}</table>'.format(table_header, table_rows)
output = '<div><div style="float: left">{}</div><div style="float: left">{}</div></div>'.format(header, table)
return output
def to_string(self, digits=None, exact=None, tol=1e-9, show_mask=False,
str_outcomes=False):
"""
Returns a string representation of the distribution.
Parameters
----------
digits : int or None
The probabilities will be rounded to the specified number of
digits, using NumPy's around function. If `None`, then no rounding
is performed. Note, if the number of digits is greater than the
precision of the floats, then the resultant number of digits will
match that smaller precision.
exact : bool
If `True`, then linear probabilities will be displayed, even if
the underlying pmf contains log probabilities. The closest
rational fraction within a tolerance specified by `tol` is used
as the display value.
tol : float
If `exact` is `True`, then the probabilities will be displayed
as the closest rational fraction within `tol`.
show_mask : bool
If `True`, show the outcomes in the proper context. Thus, masked
and unmasked random variables are shown. If `show_mask` is anything
other than `True` or `False`, it is used as the wildcard symbol.
str_outcomes
If `True`, then attempt to convert outcomes which are tuples to
just strings. This is just a dislplay technique.
Returns
-------
s : str
A string representation of the distribution.
"""
from .distribution import prepare_string
from six import StringIO
if exact is None:
exact = ditParams['print.exact']
s = StringIO()
x = prepare_string(self, digits, exact, tol, show_mask, str_outcomes)
pmf, outcomes, base, colsep, max_length, pstr = x
headers = [
"Class",
"Alphabet",
"Base",
"Outcome Class",
"Outcome Length",
"RV Names"
]
vals = []
# Class
vals.append(self.__class__.__name__)
# Alphabet
if len(self.alphabet) == 0:
alpha = "()"
elif self.is_homogeneous():
alpha = str(self.alphabet[0]) + " for all rvs"
else:
alpha = str(self.alphabet)
vals.append(alpha)
# Base
vals.append(base)
# Outcome class
outcome_class = self._outcome_class
if outcome_class is not None:
outcome_class = outcome_class.__name__
vals.append(outcome_class)
# Outcome length
if show_mask:
outcome_length = "{0} (mask: {1})"
outcome_length = outcome_length.format(self.outcome_length(),
len(self._mask))
else:
outcome_length = str(self.outcome_length())
vals.append(outcome_length)
# Random variable names
rv_names = self.get_rv_names()
vals.append(rv_names)
# Info
L = max(map(len, headers))
for head, val in zip(headers, vals):
s.write("{0}{1}\n".format("{0}: ".format(head).ljust(L+2), val))
s.write("\n")
# Distribution
s.write(''.join(['x'.ljust(max_length), colsep, pstr, "\n"]))
# Adjust for empty outcomes. Min length should be: len('x') == 1
max_length = max(1, max_length)
for o, p in zip(outcomes, pmf):
s.write(''.join([o.ljust(max_length), colsep, str(p), "\n"]))
s.seek(0)
s = s.read()
# Remove the last \n
s = s[:-1]
return s