Source code for stable_baselines.common.policies

import warnings
from itertools import zip_longest
from abc import ABC

import numpy as np
import tensorflow as tf
from gym.spaces import Discrete

from stable_baselines.a2c.utils import conv, linear, conv_to_fc, batch_to_seq, seq_to_batch, lstm
from stable_baselines.common.distributions import make_proba_dist_type
from stable_baselines.common.input import observation_input


def nature_cnn(scaled_images, **kwargs):
    """
    CNN from Nature paper.

    :param scaled_images: (TensorFlow Tensor) Image input placeholder
    :param kwargs: (dict) Extra keywords parameters for the convolutional layers of the CNN
    :return: (TensorFlow Tensor) The CNN output layer
    """
    activ = tf.nn.relu
    layer_1 = activ(conv(scaled_images, 'c1', n_filters=32, filter_size=8, stride=4, init_scale=np.sqrt(2), **kwargs))
    layer_2 = activ(conv(layer_1, 'c2', n_filters=64, filter_size=4, stride=2, init_scale=np.sqrt(2), **kwargs))
    layer_3 = activ(conv(layer_2, 'c3', n_filters=64, filter_size=3, stride=1, init_scale=np.sqrt(2), **kwargs))
    layer_3 = conv_to_fc(layer_3)
    return activ(linear(layer_3, 'fc1', n_hidden=512, init_scale=np.sqrt(2)))


def mlp_extractor(flat_observations, net_arch, act_fun):
    """
    Constructs an MLP that receives observations as an input and outputs a latent representation for the policy and
    a value network. The ``net_arch`` parameter allows to specify the amount and size of the hidden layers and how many
    of them are shared between the policy network and the value network. It is assumed to be a list with the following
    structure:

    1. An arbitrary length (zero allowed) number of integers each specifying the number of units in a shared layer.
       If the number of ints is zero, there will be no shared layers.
    2. An optional dict, to specify the following non-shared layers for the value network and the policy network.
       It is formatted like ``dict(vf=[<value layer sizes>], pi=[<policy layer sizes>])``.
       If it is missing any of the keys (pi or vf), no non-shared layers (empty list) is assumed.

    For example to construct a network with one shared layer of size 55 followed by two non-shared layers for the value
    network of size 255 and a single non-shared layer of size 128 for the policy network, the following layers_spec
    would be used: ``[55, dict(vf=[255, 255], pi=[128])]``. A simple shared network topology with two layers of size 128
    would be specified as [128, 128].

    :param flat_observations: (tf.Tensor) The observations to base policy and value function on.
    :param net_arch: ([int or dict]) The specification of the policy and value networks.
        See above for details on its formatting.
    :param act_fun: (tf function) The activation function to use for the networks.
    :return: (tf.Tensor, tf.Tensor) latent_policy, latent_value of the specified network.
        If all layers are shared, then ``latent_policy == latent_value``
    """
    latent = flat_observations
    policy_only_layers = []  # Layer sizes of the network that only belongs to the policy network
    value_only_layers = []  # Layer sizes of the network that only belongs to the value network

    # Iterate through the shared layers and build the shared parts of the network
    for idx, layer in enumerate(net_arch):
        if isinstance(layer, int):  # Check that this is a shared layer
            layer_size = layer
            latent = act_fun(linear(latent, "shared_fc{}".format(idx), layer_size, init_scale=np.sqrt(2)))
        else:
            assert isinstance(layer, dict), "Error: the net_arch list can only contain ints and dicts"
            if 'pi' in layer:
                assert isinstance(layer['pi'], list), "Error: net_arch[-1]['pi'] must contain a list of integers."
                policy_only_layers = layer['pi']

            if 'vf' in layer:
                assert isinstance(layer['vf'], list), "Error: net_arch[-1]['vf'] must contain a list of integers."
                value_only_layers = layer['vf']
            break  # From here on the network splits up in policy and value network

    # Build the non-shared part of the network
    latent_policy = latent
    latent_value = latent
    for idx, (pi_layer_size, vf_layer_size) in enumerate(zip_longest(policy_only_layers, value_only_layers)):
        if pi_layer_size is not None:
            assert isinstance(pi_layer_size, int), "Error: net_arch[-1]['pi'] must only contain integers."
            latent_policy = act_fun(linear(latent_policy, "pi_fc{}".format(idx), pi_layer_size, init_scale=np.sqrt(2)))

        if vf_layer_size is not None:
            assert isinstance(vf_layer_size, int), "Error: net_arch[-1]['vf'] must only contain integers."
            latent_value = act_fun(linear(latent_value, "vf_fc{}".format(idx), vf_layer_size, init_scale=np.sqrt(2)))

    return latent_policy, latent_value


class BasePolicy(ABC):
    """
    The base policy object

    :param sess: (TensorFlow session) The current TensorFlow session
    :param ob_space: (Gym Space) The observation space of the environment
    :param ac_space: (Gym Space) The action space of the environment
    :param n_env: (int) The number of environments to run
    :param n_steps: (int) The number of steps to run for each environment
    :param n_batch: (int) The number of batch to run (n_envs * n_steps)
    :param n_lstm: (int) The number of LSTM cells (for recurrent policies)
    :param reuse: (bool) If the policy is reusable or not
    :param scale: (bool) whether or not to scale the input
    :param obs_phs: (TensorFlow Tensor, TensorFlow Tensor) a tuple containing an override for observation placeholder
        and the processed observation placeholder respectivly
    :param add_action_ph: (bool) whether or not to create an action placeholder
    """

    def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm=256, reuse=False, scale=False,
                 obs_phs=None, add_action_ph=False):
        self.n_env = n_env
        self.n_steps = n_steps
        with tf.variable_scope("input", reuse=False):
            if obs_phs is None:
                self.obs_ph, self.processed_obs = observation_input(ob_space, n_batch, scale=scale)
            else:
                self.obs_ph, self.processed_obs = obs_phs
            self.masks_ph = tf.placeholder(tf.float32, [n_batch], name="masks_ph")  # mask (done t-1)
            self.states_ph = tf.placeholder(tf.float32, [self.n_env, n_lstm * 2], name="states_ph")  # states
            self.action_ph = None
            if add_action_ph:
                self.action_ph = tf.placeholder(dtype=ac_space.dtype, shape=(None,) + ac_space.shape, name="action_ph")
        self.sess = sess
        self.reuse = reuse
        self.ob_space = ob_space
        self.ac_space = ac_space

    def step(self, obs, state=None, mask=None):
        """
        Returns the policy for a single step

        :param obs: ([float] or [int]) The current observation of the environment
        :param state: ([float]) The last states (used in recurrent policies)
        :param mask: ([float]) The last masks (used in recurrent policies)
        :return: ([float], [float], [float], [float]) actions, values, states, neglogp
        """
        raise NotImplementedError

    def proba_step(self, obs, state=None, mask=None):
        """
        Returns the action probability for a single step

        :param obs: ([float] or [int]) The current observation of the environment
        :param state: ([float]) The last states (used in recurrent policies)
        :param mask: ([float]) The last masks (used in recurrent policies)
        :return: ([float]) the action probability
        """
        raise NotImplementedError


[docs]class ActorCriticPolicy(BasePolicy): """ Policy object that implements actor critic :param sess: (TensorFlow session) The current TensorFlow session :param ob_space: (Gym Space) The observation space of the environment :param ac_space: (Gym Space) The action space of the environment :param n_env: (int) The number of environments to run :param n_steps: (int) The number of steps to run for each environment :param n_batch: (int) The number of batch to run (n_envs * n_steps) :param n_lstm: (int) The number of LSTM cells (for recurrent policies) :param reuse: (bool) If the policy is reusable or not :param scale: (bool) whether or not to scale the input """ def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm=256, reuse=False, scale=False): super(ActorCriticPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm=n_lstm, reuse=reuse, scale=scale) self.pdtype = make_proba_dist_type(ac_space) self.is_discrete = isinstance(ac_space, Discrete) self.policy = None self.proba_distribution = None self.value_fn = None self.deterministic_action = None self.initial_state = None def _setup_init(self): """ sets up the distibutions, actions, and value """ with tf.variable_scope("output", reuse=True): assert self.policy is not None and self.proba_distribution is not None and self.value_fn is not None self.action = self.proba_distribution.sample() self.deterministic_action = self.proba_distribution.mode() self.neglogp = self.proba_distribution.neglogp(self.action) self.policy_proba = self.policy if self.is_discrete: self.policy_proba = tf.nn.softmax(self.policy_proba) self._value = self.value_fn[:, 0]
[docs] def step(self, obs, state=None, mask=None, deterministic=False): """ Returns the policy for a single step :param obs: ([float] or [int]) The current observation of the environment :param state: ([float]) The last states (used in recurrent policies) :param mask: ([float]) The last masks (used in recurrent policies) :param deterministic: (bool) Whether or not to return deterministic actions. :return: ([float], [float], [float], [float]) actions, values, states, neglogp """ raise NotImplementedError
[docs] def proba_step(self, obs, state=None, mask=None): """ Returns the action probability for a single step :param obs: ([float] or [int]) The current observation of the environment :param state: ([float]) The last states (used in recurrent policies) :param mask: ([float]) The last masks (used in recurrent policies) :return: ([float]) the action probability """ raise NotImplementedError
[docs] def value(self, obs, state=None, mask=None): """ Returns the value for a single step :param obs: ([float] or [int]) The current observation of the environment :param state: ([float]) The last states (used in recurrent policies) :param mask: ([float]) The last masks (used in recurrent policies) :return: ([float]) The associated value of the action """ raise NotImplementedError
[docs]class LstmPolicy(ActorCriticPolicy): """ Policy object that implements actor critic, using LSTMs. :param sess: (TensorFlow session) The current TensorFlow session :param ob_space: (Gym Space) The observation space of the environment :param ac_space: (Gym Space) The action space of the environment :param n_env: (int) The number of environments to run :param n_steps: (int) The number of steps to run for each environment :param n_batch: (int) The number of batch to run (n_envs * n_steps) :param n_lstm: (int) The number of LSTM cells (for recurrent policies) :param reuse: (bool) If the policy is reusable or not :param layers: ([int]) The size of the Neural network before the LSTM layer (if None, default to [64, 64]) :param cnn_extractor: (function (TensorFlow Tensor, ``**kwargs``): (TensorFlow Tensor)) the CNN feature extraction :param layer_norm: (bool) Whether or not to use layer normalizing LSTMs :param feature_extraction: (str) The feature extraction type ("cnn" or "mlp") :param kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction """ def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm=256, reuse=False, layers=None, cnn_extractor=nature_cnn, layer_norm=False, feature_extraction="cnn", **kwargs): super(LstmPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm, reuse, scale=(feature_extraction == "cnn")) if layers is None: layers = [64, 64] with tf.variable_scope("model", reuse=reuse): if feature_extraction == "cnn": extracted_features = cnn_extractor(self.processed_obs, **kwargs) else: activ = tf.tanh extracted_features = tf.layers.flatten(self.processed_obs) for i, layer_size in enumerate(layers): extracted_features = activ(linear(extracted_features, 'pi_fc' + str(i), n_hidden=layer_size, init_scale=np.sqrt(2))) input_sequence = batch_to_seq(extracted_features, self.n_env, n_steps) masks = batch_to_seq(self.masks_ph, self.n_env, n_steps) rnn_output, self.snew = lstm(input_sequence, masks, self.states_ph, 'lstm1', n_hidden=n_lstm, layer_norm=layer_norm) rnn_output = seq_to_batch(rnn_output) value_fn = linear(rnn_output, 'vf', 1) self.proba_distribution, self.policy, self.q_value = \ self.pdtype.proba_distribution_from_latent(rnn_output, rnn_output) self.value_fn = value_fn self.initial_state = np.zeros((self.n_env, n_lstm * 2), dtype=np.float32) self._setup_init()
[docs] def step(self, obs, state=None, mask=None, deterministic=False): if deterministic: return self.sess.run([self.deterministic_action, self._value, self.snew, self.neglogp], {self.obs_ph: obs, self.states_ph: state, self.masks_ph: mask}) else: return self.sess.run([self.action, self._value, self.snew, self.neglogp], {self.obs_ph: obs, self.states_ph: state, self.masks_ph: mask})
[docs] def proba_step(self, obs, state=None, mask=None): return self.sess.run(self.policy_proba, {self.obs_ph: obs, self.states_ph: state, self.masks_ph: mask})
[docs] def value(self, obs, state=None, mask=None): return self.sess.run(self._value, {self.obs_ph: obs, self.states_ph: state, self.masks_ph: mask})
[docs]class FeedForwardPolicy(ActorCriticPolicy): """ Policy object that implements actor critic, using a feed forward neural network. :param sess: (TensorFlow session) The current TensorFlow session :param ob_space: (Gym Space) The observation space of the environment :param ac_space: (Gym Space) The action space of the environment :param n_env: (int) The number of environments to run :param n_steps: (int) The number of steps to run for each environment :param n_batch: (int) The number of batch to run (n_envs * n_steps) :param reuse: (bool) If the policy is reusable or not :param layers: ([int]) (deprecated, use net_arch instead) The size of the Neural network for the policy (if None, default to [64, 64]) :param net_arch: (list) Specification of the actor-critic policy network architecture (see mlp_extractor documentation for details). :param act_fun: the activation function to use in the neural network. :param cnn_extractor: (function (TensorFlow Tensor, ``**kwargs``): (TensorFlow Tensor)) the CNN feature extraction :param feature_extraction: (str) The feature extraction type ("cnn" or "mlp") :param kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction """ def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse=False, layers=None, net_arch=None, act_fun=tf.tanh, cnn_extractor=nature_cnn, feature_extraction="cnn", **kwargs): super(FeedForwardPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm=256, reuse=reuse, scale=(feature_extraction == "cnn")) if layers is not None: warnings.warn("Usage of the `layers` parameter is deprecated! Use net_arch instead " "(it has a different semantics though).", DeprecationWarning) if net_arch is not None: warnings.warn("The new `net_arch` parameter overrides the deprecated `layers` parameter!", DeprecationWarning) if net_arch is None: if layers is None: layers = [64, 64] net_arch = [dict(vf=layers, pi=layers)] with tf.variable_scope("model", reuse=reuse): if feature_extraction == "cnn": pi_latent = vf_latent = cnn_extractor(self.processed_obs, **kwargs) else: pi_latent, vf_latent = mlp_extractor(tf.layers.flatten(self.processed_obs), net_arch, act_fun) self.value_fn = linear(vf_latent, 'vf', 1) self.proba_distribution, self.policy, self.q_value = \ self.pdtype.proba_distribution_from_latent(pi_latent, vf_latent, init_scale=0.01) self.initial_state = None self._setup_init()
[docs] def step(self, obs, state=None, mask=None, deterministic=False): if deterministic: action, value, neglogp = self.sess.run([self.deterministic_action, self._value, self.neglogp], {self.obs_ph: obs}) else: action, value, neglogp = self.sess.run([self.action, self._value, self.neglogp], {self.obs_ph: obs}) return action, value, self.initial_state, neglogp
[docs] def proba_step(self, obs, state=None, mask=None): return self.sess.run(self.policy_proba, {self.obs_ph: obs})
[docs] def value(self, obs, state=None, mask=None): return self.sess.run(self._value, {self.obs_ph: obs})
[docs]class CnnPolicy(FeedForwardPolicy): """ Policy object that implements actor critic, using a CNN (the nature CNN) :param sess: (TensorFlow session) The current TensorFlow session :param ob_space: (Gym Space) The observation space of the environment :param ac_space: (Gym Space) The action space of the environment :param n_env: (int) The number of environments to run :param n_steps: (int) The number of steps to run for each environment :param n_batch: (int) The number of batch to run (n_envs * n_steps) :param reuse: (bool) If the policy is reusable or not :param _kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction """ def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse=False, **_kwargs): super(CnnPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse, feature_extraction="cnn", **_kwargs)
[docs]class CnnLstmPolicy(LstmPolicy): """ Policy object that implements actor critic, using LSTMs with a CNN feature extraction :param sess: (TensorFlow session) The current TensorFlow session :param ob_space: (Gym Space) The observation space of the environment :param ac_space: (Gym Space) The action space of the environment :param n_env: (int) The number of environments to run :param n_steps: (int) The number of steps to run for each environment :param n_batch: (int) The number of batch to run (n_envs * n_steps) :param n_lstm: (int) The number of LSTM cells (for recurrent policies) :param reuse: (bool) If the policy is reusable or not :param kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction """ def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm=256, reuse=False, **_kwargs): super(CnnLstmPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm, reuse, layer_norm=False, feature_extraction="cnn", **_kwargs)
[docs]class CnnLnLstmPolicy(LstmPolicy): """ Policy object that implements actor critic, using a layer normalized LSTMs with a CNN feature extraction :param sess: (TensorFlow session) The current TensorFlow session :param ob_space: (Gym Space) The observation space of the environment :param ac_space: (Gym Space) The action space of the environment :param n_env: (int) The number of environments to run :param n_steps: (int) The number of steps to run for each environment :param n_batch: (int) The number of batch to run (n_envs * n_steps) :param n_lstm: (int) The number of LSTM cells (for recurrent policies) :param reuse: (bool) If the policy is reusable or not :param kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction """ def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm=256, reuse=False, **_kwargs): super(CnnLnLstmPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm, reuse, layer_norm=True, feature_extraction="cnn", **_kwargs)
[docs]class MlpPolicy(FeedForwardPolicy): """ Policy object that implements actor critic, using a MLP (2 layers of 64) :param sess: (TensorFlow session) The current TensorFlow session :param ob_space: (Gym Space) The observation space of the environment :param ac_space: (Gym Space) The action space of the environment :param n_env: (int) The number of environments to run :param n_steps: (int) The number of steps to run for each environment :param n_batch: (int) The number of batch to run (n_envs * n_steps) :param reuse: (bool) If the policy is reusable or not :param _kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction """ def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse=False, **_kwargs): super(MlpPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse, feature_extraction="mlp", **_kwargs)
[docs]class MlpLstmPolicy(LstmPolicy): """ Policy object that implements actor critic, using LSTMs with a MLP feature extraction :param sess: (TensorFlow session) The current TensorFlow session :param ob_space: (Gym Space) The observation space of the environment :param ac_space: (Gym Space) The action space of the environment :param n_env: (int) The number of environments to run :param n_steps: (int) The number of steps to run for each environment :param n_batch: (int) The number of batch to run (n_envs * n_steps) :param n_lstm: (int) The number of LSTM cells (for recurrent policies) :param reuse: (bool) If the policy is reusable or not :param kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction """ def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm=256, reuse=False, **_kwargs): super(MlpLstmPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm, reuse, layer_norm=False, feature_extraction="mlp", **_kwargs)
[docs]class MlpLnLstmPolicy(LstmPolicy): """ Policy object that implements actor critic, using a layer normalized LSTMs with a MLP feature extraction :param sess: (TensorFlow session) The current TensorFlow session :param ob_space: (Gym Space) The observation space of the environment :param ac_space: (Gym Space) The action space of the environment :param n_env: (int) The number of environments to run :param n_steps: (int) The number of steps to run for each environment :param n_batch: (int) The number of batch to run (n_envs * n_steps) :param n_lstm: (int) The number of LSTM cells (for recurrent policies) :param reuse: (bool) If the policy is reusable or not :param kwargs: (dict) Extra keyword arguments for the nature CNN feature extraction """ def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm=256, reuse=False, **_kwargs): super(MlpLnLstmPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, n_lstm, reuse, layer_norm=True, feature_extraction="mlp", **_kwargs)
_policy_registry = { ActorCriticPolicy: { "CnnPolicy": CnnPolicy, "CnnLstmPolicy": CnnLstmPolicy, "CnnLnLstmPolicy": CnnLnLstmPolicy, "MlpPolicy": MlpPolicy, "MlpLstmPolicy": MlpLstmPolicy, "MlpLnLstmPolicy": MlpLnLstmPolicy, } } def get_policy_from_name(base_policy_type, name): """ returns the registed policy from the base type and name :param base_policy_type: (BasePolicy) the base policy object :param name: (str) the policy name :return: (base_policy_type) the policy """ if base_policy_type not in _policy_registry: raise ValueError("Error: the policy type {} is not registered!".format(base_policy_type)) if name not in _policy_registry[base_policy_type]: raise ValueError("Error: unknown policy type {}, the only registed policy type are: {}!" .format(name, list(_policy_registry[base_policy_type].keys()))) return _policy_registry[base_policy_type][name] def register_policy(name, policy): """ returns the registed policy from the base type and name :param name: (str) the policy name :param policy: (subclass of BasePolicy) the policy """ sub_class = None for cls in BasePolicy.__subclasses__(): if issubclass(policy, cls): sub_class = cls break if sub_class is None: raise ValueError("Error: the policy {} is not of any known subclasses of BasePolicy!".format(policy)) if sub_class not in _policy_registry: _policy_registry[sub_class] = {} if name in _policy_registry[sub_class]: raise ValueError("Error: the name {} is alreay registered for a different policy, will not override." .format(name)) _policy_registry[sub_class][name] = policy