Source code for stable_baselines.common.base_class

from abc import ABC, abstractmethod
import os
import glob
import warnings

import cloudpickle
import numpy as np
import gym
import tensorflow as tf

from stable_baselines.common import set_global_seeds
from stable_baselines.common.policies import LstmPolicy, get_policy_from_name, ActorCriticPolicy
from stable_baselines.common.vec_env import VecEnvWrapper, VecEnv, DummyVecEnv
from stable_baselines import logger


[docs]class BaseRLModel(ABC): """ The base RL model :param policy: (BasePolicy) Policy object :param env: (Gym environment) The environment to learn from (if registered in Gym, can be str. Can be None for loading trained models) :param verbose: (int) the verbosity level: 0 none, 1 training information, 2 tensorflow debug :param requires_vec_env: (bool) Does this model require a vectorized environment :param policy_base: (BasePolicy) the base policy used by this method """ def __init__(self, policy, env, verbose=0, *, requires_vec_env, policy_base, policy_kwargs=None): if isinstance(policy, str): self.policy = get_policy_from_name(policy_base, policy) else: self.policy = policy self.env = env self.verbose = verbose self._requires_vec_env = requires_vec_env self.policy_kwargs = {} if policy_kwargs is None else policy_kwargs self.observation_space = None self.action_space = None self.n_envs = None self._vectorize_action = False if env is not None: if isinstance(env, str): if self.verbose >= 1: print("Creating environment from the given name, wrapped in a DummyVecEnv.") self.env = env = DummyVecEnv([lambda: gym.make(env)]) self.observation_space = env.observation_space self.action_space = env.action_space if requires_vec_env: if isinstance(env, VecEnv): self.n_envs = env.num_envs else: raise ValueError("Error: the model requires a vectorized environment, please use a VecEnv wrapper.") else: if isinstance(env, VecEnv): if env.num_envs == 1: self.env = _UnvecWrapper(env) self._vectorize_action = True else: raise ValueError("Error: the model requires a non vectorized environment or a single vectorized" " environment.") self.n_envs = 1
[docs] def get_env(self): """ returns the current environment (can be None if not defined) :return: (Gym Environment) The current environment """ return self.env
[docs] def set_env(self, env): """ Checks the validity of the environment, and if it is coherent, set it as the current environment. :param env: (Gym Environment) The environment for learning a policy """ if env is None and self.env is None: if self.verbose >= 1: print("Loading a model without an environment, " "this model cannot be trained until it has a valid environment.") return elif env is None: raise ValueError("Error: trying to replace the current environment with None") # sanity checking the environment assert self.observation_space == env.observation_space, \ "Error: the environment passed must have at least the same observation space as the model was trained on." assert self.action_space == env.action_space, \ "Error: the environment passed must have at least the same action space as the model was trained on." if self._requires_vec_env: assert isinstance(env, VecEnv), \ "Error: the environment passed is not a vectorized environment, however {} requires it".format( self.__class__.__name__) assert not issubclass(self.policy, LstmPolicy) or self.n_envs == env.num_envs, \ "Error: the environment passed must have the same number of environments as the model was trained on." \ "This is due to the Lstm policy not being capable of changing the number of environments." self.n_envs = env.num_envs else: # for models that dont want vectorized environment, check if they make sense and adapt them. # Otherwise tell the user about this issue if isinstance(env, VecEnv): if env.num_envs == 1: env = _UnvecWrapper(env) self._vectorize_action = True else: raise ValueError("Error: the model requires a non vectorized environment or a single vectorized " "environment.") else: self._vectorize_action = False self.n_envs = 1 self.env = env
[docs] @abstractmethod def setup_model(self): """ Create all the functions and tensorflow graphs necessary to train the model """ pass
def _setup_learn(self, seed): """ check the environment, set the seed, and set the logger :param seed: (int) the seed value """ if self.env is None: raise ValueError("Error: cannot train the model without a valid environment, please set an environment with" "set_env(self, env) method.") if seed is not None: set_global_seeds(seed)
[docs] @abstractmethod def learn(self, total_timesteps, callback=None, seed=None, log_interval=100, tb_log_name="run"): """ Return a trained model. :param total_timesteps: (int) The total number of samples to train on :param seed: (int) The initial seed for training, if None: keep current seed :param callback: (function (dict, dict)) -> boolean function called at every steps with state of the algorithm. It takes the local and global variables. If it returns False, training is aborted. :param log_interval: (int) The number of timesteps before logging. :param tb_log_name: (str) the name of the run for tensorboard log :return: (BaseRLModel) the trained model """ pass
[docs] @abstractmethod def predict(self, observation, state=None, mask=None, deterministic=False): """ Get the model's action from an observation :param observation: (np.ndarray) the input observation :param state: (np.ndarray) The last states (can be None, used in recurrent policies) :param mask: (np.ndarray) The last masks (can be None, used in recurrent policies) :param deterministic: (bool) Whether or not to return deterministic actions. :return: (np.ndarray, np.ndarray) the model's action and the next state (used in recurrent policies) """ pass
[docs] @abstractmethod def action_probability(self, observation, state=None, mask=None, actions=None): """ If ``actions`` is ``None``, then get the model's action probability distribution from a given observation depending on the action space the output is: - Discrete: probability for each possible action - Box: mean and standard deviation of the action output However if ``actions`` is not ``None``, this function will return the probability that the given actions are taken with the given parameters (observation, state, ...) on this model. .. warning:: When working with continuous probability distribution (e.g. Gaussian distribution for continuous action) the probability of taking a particular action is exactly zero. See http://blog.christianperone.com/2019/01/ for a good explanation :param observation: (np.ndarray) the input observation :param state: (np.ndarray) The last states (can be None, used in recurrent policies) :param mask: (np.ndarray) The last masks (can be None, used in recurrent policies) :param actions: (np.ndarray) (OPTIONAL) For calculating the likelihood that the given actions are chosen by the model for each of the given parameters. Must have the same number of actions and observations. (set to None to return the complete action probability distribution) :return: (np.ndarray) the model's action probability """ pass
[docs] @abstractmethod def save(self, save_path): """ Save the current parameters to file :param save_path: (str or file-like object) the save location """ # self._save_to_file(save_path, data={}, params=None) raise NotImplementedError()
[docs] @classmethod @abstractmethod def load(cls, load_path, env=None, **kwargs): """ Load the model from file :param load_path: (str or file-like) the saved parameter location :param env: (Gym Envrionment) the new environment to run the loaded model on (can be None if you only need prediction from a trained model) :param kwargs: extra arguments to change the model when loading """ # data, param = cls._load_from_file(load_path) raise NotImplementedError()
@staticmethod def _save_to_file(save_path, data=None, params=None): if isinstance(save_path, str): _, ext = os.path.splitext(save_path) if ext == "": save_path += ".pkl" with open(save_path, "wb") as file_: cloudpickle.dump((data, params), file_) else: # Here save_path is a file-like object, not a path cloudpickle.dump((data, params), save_path) @staticmethod def _load_from_file(load_path): if isinstance(load_path, str): if not os.path.exists(load_path): if os.path.exists(load_path + ".pkl"): load_path += ".pkl" else: raise ValueError("Error: the file {} could not be found".format(load_path)) with open(load_path, "rb") as file: data, params = cloudpickle.load(file) else: # Here load_path is a file-like object, not a path data, params = cloudpickle.load(load_path) return data, params @staticmethod def _softmax(x_input): """ An implementation of softmax. :param x_input: (numpy float) input vector :return: (numpy float) output vector """ x_exp = np.exp(x_input.T - np.max(x_input.T, axis=0)) return (x_exp / x_exp.sum(axis=0)).T @staticmethod def _is_vectorized_observation(observation, observation_space): """ For every observation type, detects and validates the shape, then returns whether or not the observation is vectorized. :param observation: (np.ndarray) the input observation to validate :param observation_space: (gym.spaces) the observation space :return: (bool) whether the given observation is vectorized or not """ if isinstance(observation_space, gym.spaces.Box): if observation.shape == observation_space.shape: return False elif observation.shape[1:] == observation_space.shape: return True else: raise ValueError("Error: Unexpected observation shape {} for ".format(observation.shape) + "Box environment, please use {} ".format(observation_space.shape) + "or (n_env, {}) for the observation shape." .format(", ".join(map(str, observation_space.shape)))) elif isinstance(observation_space, gym.spaces.Discrete): if observation.shape == (): # A numpy array of a number, has shape empty tuple '()' return False elif len(observation.shape) == 1: return True else: raise ValueError("Error: Unexpected observation shape {} for ".format(observation.shape) + "Discrete environment, please use (1,) or (n_env, 1) for the observation shape.") elif isinstance(observation_space, gym.spaces.MultiDiscrete): if observation.shape == (len(observation_space.nvec),): return False elif len(observation.shape) == 2 and observation.shape[1] == len(observation_space.nvec): return True else: raise ValueError("Error: Unexpected observation shape {} for MultiDiscrete ".format(observation.shape) + "environment, please use ({},) or ".format(len(observation_space.nvec)) + "(n_env, {}) for the observation shape.".format(len(observation_space.nvec))) elif isinstance(observation_space, gym.spaces.MultiBinary): if observation.shape == (observation_space.n,): return False elif len(observation.shape) == 2 and observation.shape[1] == observation_space.n: return True else: raise ValueError("Error: Unexpected observation shape {} for MultiBinary ".format(observation.shape) + "environment, please use ({},) or ".format(observation_space.n) + "(n_env, {}) for the observation shape.".format(observation_space.n)) else: raise ValueError("Error: Cannot determine if the observation is vectorized with the space type {}." .format(observation_space))
class ActorCriticRLModel(BaseRLModel): """ The base class for Actor critic model :param policy: (BasePolicy) Policy object :param env: (Gym environment) The environment to learn from (if registered in Gym, can be str. Can be None for loading trained models) :param verbose: (int) the verbosity level: 0 none, 1 training information, 2 tensorflow debug :param policy_base: (BasePolicy) the base policy used by this method (default=ActorCriticPolicy) :param requires_vec_env: (bool) Does this model require a vectorized environment """ def __init__(self, policy, env, _init_setup_model, verbose=0, policy_base=ActorCriticPolicy, requires_vec_env=False, policy_kwargs=None): super(ActorCriticRLModel, self).__init__(policy, env, verbose=verbose, requires_vec_env=requires_vec_env, policy_base=policy_base, policy_kwargs=policy_kwargs) self.sess = None self.initial_state = None self.step = None self.proba_step = None self.params = None @abstractmethod def setup_model(self): pass @abstractmethod def learn(self, total_timesteps, callback=None, seed=None, log_interval=100, tb_log_name="run"): pass def predict(self, observation, state=None, mask=None, deterministic=False): if state is None: state = self.initial_state if mask is None: mask = [False for _ in range(self.n_envs)] observation = np.array(observation) vectorized_env = self._is_vectorized_observation(observation, self.observation_space) observation = observation.reshape((-1,) + self.observation_space.shape) actions, _, states, _ = self.step(observation, state, mask, deterministic=deterministic) clipped_actions = actions # Clip the actions to avoid out of bound error if isinstance(self.action_space, gym.spaces.Box): clipped_actions = np.clip(actions, self.action_space.low, self.action_space.high) if not vectorized_env: if state is not None: raise ValueError("Error: The environment must be vectorized when using recurrent policies.") clipped_actions = clipped_actions[0] return clipped_actions, states def action_probability(self, observation, state=None, mask=None, actions=None): if state is None: state = self.initial_state if mask is None: mask = [False for _ in range(self.n_envs)] observation = np.array(observation) vectorized_env = self._is_vectorized_observation(observation, self.observation_space) observation = observation.reshape((-1,) + self.observation_space.shape) actions_proba = self.proba_step(observation, state, mask) if len(actions_proba) == 0: # empty list means not implemented warnings.warn("Warning: action probability is not implemented for {} action space. Returning None." .format(type(self.action_space).__name__)) return None if actions is not None: # comparing the action distribution, to given actions actions = np.array([actions]) if isinstance(self.action_space, gym.spaces.Discrete): actions = actions.reshape((-1,)) assert observation.shape[0] == actions.shape[0], \ "Error: batch sizes differ for actions and observations." actions_proba = actions_proba[np.arange(actions.shape[0]), actions] elif isinstance(self.action_space, gym.spaces.MultiDiscrete): actions = actions.reshape((-1, len(self.action_space.nvec))) assert observation.shape[0] == actions.shape[0], \ "Error: batch sizes differ for actions and observations." # Discrete action probability, over multiple categories actions = np.swapaxes(actions, 0, 1) # swap axis for easier categorical split actions_proba = np.prod([proba[np.arange(act.shape[0]), act] for proba, act in zip(actions_proba, actions)], axis=0) elif isinstance(self.action_space, gym.spaces.MultiBinary): actions = actions.reshape((-1, self.action_space.n)) assert observation.shape[0] == actions.shape[0], \ "Error: batch sizes differ for actions and observations." # Bernoulli action probability, for every action actions_proba = np.prod(actions_proba * actions + (1 - actions_proba) * (1 - actions), axis=1) elif isinstance(self.action_space, gym.spaces.Box): warnings.warn("The probabilty of taken a given action is exactly zero for a continuous distribution." "See http://blog.christianperone.com/2019/01/ for a good explanation") actions_proba = np.zeros((observation.shape[0], 1), dtype=np.float32) else: warnings.warn("Warning: action_probability not implemented for {} actions space. Returning None." .format(type(self.action_space).__name__)) return None # normalize action proba shape for the different gym spaces actions_proba = actions_proba.reshape((-1, 1)) if not vectorized_env: if state is not None: raise ValueError("Error: The environment must be vectorized when using recurrent policies.") actions_proba = actions_proba[0] return actions_proba @abstractmethod def save(self, save_path): pass @classmethod def load(cls, load_path, env=None, **kwargs): data, params = cls._load_from_file(load_path) if 'policy_kwargs' in kwargs and kwargs['policy_kwargs'] != data['policy_kwargs']: raise ValueError("The specified policy kwargs do not equal the stored policy kwargs. " "Stored kwargs: {}, specified kwargs: {}".format(data['policy_kwargs'], kwargs['policy_kwargs'])) model = cls(policy=data["policy"], env=None, _init_setup_model=False) model.__dict__.update(data) model.__dict__.update(kwargs) model.set_env(env) model.setup_model() restores = [] for param, loaded_p in zip(model.params, params): restores.append(param.assign(loaded_p)) model.sess.run(restores) return model class OffPolicyRLModel(BaseRLModel): """ The base class for off policy RL model :param policy: (BasePolicy) Policy object :param env: (Gym environment) The environment to learn from (if registered in Gym, can be str. Can be None for loading trained models) :param replay_buffer: (ReplayBuffer) the type of replay buffer :param verbose: (int) the verbosity level: 0 none, 1 training information, 2 tensorflow debug :param requires_vec_env: (bool) Does this model require a vectorized environment :param policy_base: (BasePolicy) the base policy used by this method """ def __init__(self, policy, env, replay_buffer, verbose=0, *, requires_vec_env, policy_base, policy_kwargs=None): super(OffPolicyRLModel, self).__init__(policy, env, verbose=verbose, requires_vec_env=requires_vec_env, policy_base=policy_base, policy_kwargs=policy_kwargs) self.replay_buffer = replay_buffer @abstractmethod def setup_model(self): pass @abstractmethod def learn(self, total_timesteps, callback=None, seed=None, log_interval=100, tb_log_name="run"): pass @abstractmethod def predict(self, observation, state=None, mask=None, deterministic=False): pass @abstractmethod def action_probability(self, observation, state=None, mask=None, actions=None): pass @abstractmethod def save(self, save_path): pass @classmethod @abstractmethod def load(cls, load_path, env=None, **kwargs): pass class _UnvecWrapper(VecEnvWrapper): def __init__(self, venv): """ Unvectorize a vectorized environment, for vectorized environment that only have one environment :param venv: (VecEnv) the vectorized environment to wrap """ super().__init__(venv) assert venv.num_envs == 1, "Error: cannot unwrap a environment wrapper that has more than one environment." def reset(self): return self.venv.reset()[0] def step_async(self, actions): self.venv.step_async([actions]) def step_wait(self): actions, values, states, information = self.venv.step_wait() return actions[0], float(values[0]), states[0], information[0] def render(self, mode='human'): return self.venv.render(mode=mode) class SetVerbosity: def __init__(self, verbose=0): """ define a region of code for certain level of verbosity :param verbose: (int) the verbosity level: 0 none, 1 training information, 2 tensorflow debug """ self.verbose = verbose def __enter__(self): self.tf_level = os.environ.get('TF_CPP_MIN_LOG_LEVEL', '0') self.log_level = logger.get_level() self.gym_level = gym.logger.MIN_LEVEL if self.verbose <= 1: os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' if self.verbose <= 0: logger.set_level(logger.DISABLED) gym.logger.set_level(gym.logger.DISABLED) def __exit__(self, exc_type, exc_val, exc_tb): if self.verbose <= 1: os.environ['TF_CPP_MIN_LOG_LEVEL'] = self.tf_level if self.verbose <= 0: logger.set_level(self.log_level) gym.logger.set_level(self.gym_level) class TensorboardWriter: def __init__(self, graph, tensorboard_log_path, tb_log_name): """ Create a Tensorboard writer for a code segment, and saves it to the log directory as its own run :param graph: (Tensorflow Graph) the model graph :param tensorboard_log_path: (str) the save path for the log (can be None for no logging) :param tb_log_name: (str) the name of the run for tensorboard log """ self.graph = graph self.tensorboard_log_path = tensorboard_log_path self.tb_log_name = tb_log_name self.writer = None def __enter__(self): if self.tensorboard_log_path is not None: save_path = os.path.join(self.tensorboard_log_path, "{}_{}".format(self.tb_log_name, self._get_latest_run_id() + 1)) self.writer = tf.summary.FileWriter(save_path, graph=self.graph) return self.writer def _get_latest_run_id(self): """ returns the latest run number for the given log name and log path, by finding the greatest number in the directories. :return: (int) latest run number """ max_run_id = 0 for path in glob.glob(self.tensorboard_log_path + "/{}_[0-9]*".format(self.tb_log_name)): file_name = path.split("/")[-1] ext = file_name.split("_")[-1] if self.tb_log_name == "_".join(file_name.split("_")[:-1]) and ext.isdigit() and int(ext) > max_run_id: max_run_id = int(ext) return max_run_id def __exit__(self, exc_type, exc_val, exc_tb): if self.writer is not None: self.writer.add_graph(self.graph) self.writer.flush()