Source code for stable_baselines.common.distributions

import numpy as np
import tensorflow as tf
from tensorflow.python.ops import math_ops
from gym import spaces

from stable_baselines.a2c.utils import linear

[docs]class ProbabilityDistribution(object): """ A particular probability distribution """
[docs] def flatparam(self): """ Return the direct probabilities :return: ([float]) the probabilites """ raise NotImplementedError
[docs] def mode(self): """ Returns the probability :return: (Tensorflow Tensor) the deterministic action """ raise NotImplementedError
[docs] def neglogp(self, x): """ returns the of the negative log likelihood :param x: (str) the labels of each index :return: ([float]) The negative log likelihood of the distribution """ # Usually it's easier to define the negative logprob raise NotImplementedError
[docs] def kl(self, other): """ Calculates the Kullback-Leiber divergence from the given probabilty distribution :param other: ([float]) the distibution to compare with :return: (float) the KL divergence of the two distributions """ raise NotImplementedError
[docs] def entropy(self): """ Returns shannon's entropy of the probability :return: (float) the entropy """ raise NotImplementedError
[docs] def sample(self): """ returns a sample from the probabilty distribution :return: (Tensorflow Tensor) the stochastic action """ raise NotImplementedError
[docs] def logp(self, x): """ returns the of the log likelihood :param x: (str) the labels of each index :return: ([float]) The log likelihood of the distribution """ return - self.neglogp(x)
[docs]class ProbabilityDistributionType(object): """ Parametrized family of probability distributions """
[docs] def probability_distribution_class(self): """ returns the ProbabilityDistribution class of this type :return: (Type ProbabilityDistribution) the probability distribution class associated """ raise NotImplementedError
[docs] def proba_distribution_from_flat(self, flat): """ Returns the probability distribution from flat probabilities flat: flattened vector of parameters of probability distribution :param flat: ([float]) the flat probabilities :return: (ProbabilityDistribution) the instance of the ProbabilityDistribution associated """ return self.probability_distribution_class()(flat)
[docs] def proba_distribution_from_latent(self, pi_latent_vector, vf_latent_vector, init_scale=1.0, init_bias=0.0): """ returns the probability distribution from latent values :param pi_latent_vector: ([float]) the latent pi values :param vf_latent_vector: ([float]) the latent vf values :param init_scale: (float) the inital scale of the distribution :param init_bias: (float) the inital bias of the distribution :return: (ProbabilityDistribution) the instance of the ProbabilityDistribution associated """ raise NotImplementedError
[docs] def param_shape(self): """ returns the shape of the input parameters :return: ([int]) the shape """ raise NotImplementedError
[docs] def sample_shape(self): """ returns the shape of the sampling :return: ([int]) the shape """ raise NotImplementedError
[docs] def sample_dtype(self): """ returns the type of the sampling :return: (type) the type """ raise NotImplementedError
[docs] def param_placeholder(self, prepend_shape, name=None): """ returns the TensorFlow placeholder for the input parameters :param prepend_shape: ([int]) the prepend shape :param name: (str) the placeholder name :return: (TensorFlow Tensor) the placeholder """ return tf.placeholder(dtype=tf.float32, shape=prepend_shape + self.param_shape(), name=name)
[docs] def sample_placeholder(self, prepend_shape, name=None): """ returns the TensorFlow placeholder for the sampling :param prepend_shape: ([int]) the prepend shape :param name: (str) the placeholder name :return: (TensorFlow Tensor) the placeholder """ return tf.placeholder(dtype=self.sample_dtype(), shape=prepend_shape + self.sample_shape(), name=name)
[docs]class CategoricalProbabilityDistributionType(ProbabilityDistributionType): def __init__(self, n_cat): """ The probability distribution type for categorical input :param n_cat: (int) the number of categories """ self.n_cat = n_cat
[docs] def probability_distribution_class(self): return CategoricalProbabilityDistribution
[docs] def proba_distribution_from_latent(self, pi_latent_vector, vf_latent_vector, init_scale=1.0, init_bias=0.0): pdparam = linear(pi_latent_vector, 'pi', self.n_cat, init_scale=init_scale, init_bias=init_bias) q_values = linear(vf_latent_vector, 'q', self.n_cat, init_scale=init_scale, init_bias=init_bias) return self.proba_distribution_from_flat(pdparam), pdparam, q_values
[docs] def param_shape(self): return [self.n_cat]
[docs] def sample_shape(self): return []
[docs] def sample_dtype(self): return tf.int32
[docs]class MultiCategoricalProbabilityDistributionType(ProbabilityDistributionType): def __init__(self, n_vec): """ The probability distribution type for multiple categorical input :param n_vec: ([int]) the vectors """ # Cast the variable because tf does not allow uint32 self.n_vec = n_vec.astype(np.int32) # Check that the cast was valid assert (self.n_vec > 0).all(), "Casting uint32 to int32 was invalid"
[docs] def probability_distribution_class(self): return MultiCategoricalProbabilityDistribution
[docs] def proba_distribution_from_flat(self, flat): return MultiCategoricalProbabilityDistribution(self.n_vec, flat)
[docs] def proba_distribution_from_latent(self, pi_latent_vector, vf_latent_vector, init_scale=1.0, init_bias=0.0): pdparam = linear(pi_latent_vector, 'pi', sum(self.n_vec), init_scale=init_scale, init_bias=init_bias) q_values = linear(vf_latent_vector, 'q', sum(self.n_vec), init_scale=init_scale, init_bias=init_bias) return self.proba_distribution_from_flat(pdparam), pdparam, q_values
[docs] def param_shape(self): return [sum(self.n_vec)]
[docs] def sample_shape(self): return [len(self.n_vec)]
[docs] def sample_dtype(self): return tf.int32
[docs]class DiagGaussianProbabilityDistributionType(ProbabilityDistributionType): def __init__(self, size): """ The probability distribution type for multivariate gaussian input :param size: (int) the number of dimensions of the multivariate gaussian """ self.size = size
[docs] def probability_distribution_class(self): return DiagGaussianProbabilityDistribution
[docs] def proba_distribution_from_flat(self, flat): """ returns the probability distribution from flat probabilities :param flat: ([float]) the flat probabilities :return: (ProbabilityDistribution) the instance of the ProbabilityDistribution associated """ return self.probability_distribution_class()(flat)
[docs] def proba_distribution_from_latent(self, pi_latent_vector, vf_latent_vector, init_scale=1.0, init_bias=0.0): mean = linear(pi_latent_vector, 'pi', self.size, init_scale=init_scale, init_bias=init_bias) logstd = tf.get_variable(name='pi/logstd', shape=[1, self.size], initializer=tf.zeros_initializer()) pdparam = tf.concat([mean, mean * 0.0 + logstd], axis=1) q_values = linear(vf_latent_vector, 'q', self.size, init_scale=init_scale, init_bias=init_bias) return self.proba_distribution_from_flat(pdparam), mean, q_values
[docs] def param_shape(self): return [2 * self.size]
[docs] def sample_shape(self): return [self.size]
[docs] def sample_dtype(self): return tf.float32
[docs]class BernoulliProbabilityDistributionType(ProbabilityDistributionType): def __init__(self, size): """ The probability distribution type for bernoulli input :param size: (int) the number of dimensions of the bernoulli distribution """ self.size = size
[docs] def probability_distribution_class(self): return BernoulliProbabilityDistribution
[docs] def proba_distribution_from_latent(self, pi_latent_vector, vf_latent_vector, init_scale=1.0, init_bias=0.0): pdparam = linear(pi_latent_vector, 'pi', self.size, init_scale=init_scale, init_bias=init_bias) q_values = linear(vf_latent_vector, 'q', self.size, init_scale=init_scale, init_bias=init_bias) return self.proba_distribution_from_flat(pdparam), pdparam, q_values
[docs] def param_shape(self): return [self.size]
[docs] def sample_shape(self): return [self.size]
[docs] def sample_dtype(self): return tf.int32
[docs]class CategoricalProbabilityDistribution(ProbabilityDistribution): def __init__(self, logits): """ Probability distributions from categorical input :param logits: ([float]) the categorical logits input """ self.logits = logits
[docs] def flatparam(self): return self.logits
[docs] def mode(self): return tf.argmax(self.logits, axis=-1)
[docs] def neglogp(self, x): # Note: we can't use sparse_softmax_cross_entropy_with_logits because # the implementation does not allow second-order derivatives... one_hot_actions = tf.one_hot(x, self.logits.get_shape().as_list()[-1]) return tf.nn.softmax_cross_entropy_with_logits_v2( logits=self.logits, labels=tf.stop_gradient(one_hot_actions))
[docs] def kl(self, other): a_0 = self.logits - tf.reduce_max(self.logits, axis=-1, keepdims=True) a_1 = other.logits - tf.reduce_max(other.logits, axis=-1, keepdims=True) exp_a_0 = tf.exp(a_0) exp_a_1 = tf.exp(a_1) z_0 = tf.reduce_sum(exp_a_0, axis=-1, keepdims=True) z_1 = tf.reduce_sum(exp_a_1, axis=-1, keepdims=True) p_0 = exp_a_0 / z_0 return tf.reduce_sum(p_0 * (a_0 - tf.log(z_0) - a_1 + tf.log(z_1)), axis=-1)
[docs] def entropy(self): a_0 = self.logits - tf.reduce_max(self.logits, axis=-1, keepdims=True) exp_a_0 = tf.exp(a_0) z_0 = tf.reduce_sum(exp_a_0, axis=-1, keepdims=True) p_0 = exp_a_0 / z_0 return tf.reduce_sum(p_0 * (tf.log(z_0) - a_0), axis=-1)
[docs] def sample(self): # Gumbel-max trick to sample # a categorical distribution (see uniform = tf.random_uniform(tf.shape(self.logits), dtype=self.logits.dtype) return tf.argmax(self.logits - tf.log(-tf.log(uniform)), axis=-1)
[docs] @classmethod def fromflat(cls, flat): """ Create an instance of this from new logits values :param flat: ([float]) the categorical logits input :return: (ProbabilityDistribution) the instance from the given categorical input """ return cls(flat)
[docs]class MultiCategoricalProbabilityDistribution(ProbabilityDistribution): def __init__(self, nvec, flat): """ Probability distributions from multicategorical input :param nvec: ([int]) the sizes of the different categorical inputs :param flat: ([float]) the categorical logits input """ self.flat = flat self.categoricals = list(map(CategoricalProbabilityDistribution, tf.split(flat, nvec, axis=-1)))
[docs] def flatparam(self): return self.flat
[docs] def mode(self): return tf.cast(tf.stack([p.mode() for p in self.categoricals], axis=-1), tf.int32)
[docs] def neglogp(self, x): return tf.add_n([p.neglogp(px) for p, px in zip(self.categoricals, tf.unstack(x, axis=-1))])
[docs] def kl(self, other): return tf.add_n([p.kl(q) for p, q in zip(self.categoricals, other.categoricals)])
[docs] def entropy(self): return tf.add_n([p.entropy() for p in self.categoricals])
[docs] def sample(self): return tf.cast(tf.stack([p.sample() for p in self.categoricals], axis=-1), tf.int32)
[docs] @classmethod def fromflat(cls, flat): """ Create an instance of this from new logits values :param flat: ([float]) the multi categorical logits input :return: (ProbabilityDistribution) the instance from the given multi categorical input """ raise NotImplementedError
[docs]class DiagGaussianProbabilityDistribution(ProbabilityDistribution): def __init__(self, flat): """ Probability distributions from multivariate gaussian input :param flat: ([float]) the multivariate gaussian input data """ self.flat = flat mean, logstd = tf.split(axis=len(flat.shape) - 1, num_or_size_splits=2, value=flat) self.mean = mean self.logstd = logstd self.std = tf.exp(logstd)
[docs] def flatparam(self): return self.flat
[docs] def mode(self): # Bounds are taken into account outside this class (during training only) return self.mean
[docs] def neglogp(self, x): return 0.5 * tf.reduce_sum(tf.square((x - self.mean) / self.std), axis=-1) \ + 0.5 * np.log(2.0 * np.pi) * tf.cast(tf.shape(x)[-1], tf.float32) \ + tf.reduce_sum(self.logstd, axis=-1)
[docs] def kl(self, other): assert isinstance(other, DiagGaussianProbabilityDistribution) return tf.reduce_sum(other.logstd - self.logstd + (tf.square(self.std) + tf.square(self.mean - other.mean)) / (2.0 * tf.square(other.std)) - 0.5, axis=-1)
[docs] def entropy(self): return tf.reduce_sum(self.logstd + .5 * np.log(2.0 * np.pi * np.e), axis=-1)
[docs] def sample(self): # Bounds are taken into acount outside this class (during training only) # Otherwise, it changes the distribution and breaks PPO2 for instance return self.mean + self.std * tf.random_normal(tf.shape(self.mean), dtype=self.mean.dtype)
[docs] @classmethod def fromflat(cls, flat): """ Create an instance of this from new multivariate gaussian input :param flat: ([float]) the multivariate gaussian input data :return: (ProbabilityDistribution) the instance from the given multivariate gaussian input data """ return cls(flat)
[docs]class BernoulliProbabilityDistribution(ProbabilityDistribution): def __init__(self, logits): """ Probability distributions from bernoulli input :param logits: ([float]) the bernoulli input data """ self.logits = logits self.probabilities = tf.sigmoid(logits)
[docs] def flatparam(self): return self.logits
[docs] def mode(self): return tf.round(self.probabilities)
[docs] def neglogp(self, x): return tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits, labels=tf.cast(x, tf.float32)), axis=-1)
[docs] def kl(self, other): return tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(logits=other.logits, labels=self.probabilities), axis=-1) - \ tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits, labels=self.probabilities), axis=-1)
[docs] def entropy(self): return tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits, labels=self.probabilities), axis=-1)
[docs] def sample(self): samples_from_uniform = tf.random_uniform(tf.shape(self.probabilities)) return tf.cast(math_ops.less(samples_from_uniform, self.probabilities), tf.float32)
[docs] @classmethod def fromflat(cls, flat): """ Create an instance of this from new bernoulli input :param flat: ([float]) the bernoulli input data :return: (ProbabilityDistribution) the instance from the given bernoulli input data """ return cls(flat)
[docs]def make_proba_dist_type(ac_space): """ return an instance of ProbabilityDistributionType for the correct type of action space :param ac_space: (Gym Space) the input action space :return: (ProbabilityDistributionType) the approriate instance of a ProbabilityDistributionType """ if isinstance(ac_space, spaces.Box): assert len(ac_space.shape) == 1, "Error: the action space must be a vector" return DiagGaussianProbabilityDistributionType(ac_space.shape[0]) elif isinstance(ac_space, spaces.Discrete): return CategoricalProbabilityDistributionType(ac_space.n) elif isinstance(ac_space, spaces.MultiDiscrete): return MultiCategoricalProbabilityDistributionType(ac_space.nvec) elif isinstance(ac_space, spaces.MultiBinary): return BernoulliProbabilityDistributionType(ac_space.n) else: raise NotImplementedError("Error: probability distribution, not implemented for action space of type {}." .format(type(ac_space)) + " Must be of type Gym Spaces: Box, Discrete, MultiDiscrete or MultiBinary.")
[docs]def shape_el(tensor, index): """ get the shape of a TensorFlow Tensor element :param tensor: (TensorFlow Tensor) the input tensor :param index: (int) the element :return: ([int]) the shape """ maybe = tensor.get_shape()[index] if maybe is not None: return maybe else: return tf.shape(tensor)[index]