Source code for stable_baselines.common.callbacks

import os
from abc import ABC
import warnings
import typing
from typing import Union, List, Dict, Any, Optional

import gym
import numpy as np

from stable_baselines.common.vec_env import VecEnv, sync_envs_normalization, DummyVecEnv
from stable_baselines.common.evaluation import evaluate_policy
from stable_baselines import logger

if typing.TYPE_CHECKING:
    from stable_baselines.common.base_class import BaseRLModel  # pytype: disable=pyi-error


[docs]class BaseCallback(ABC): """ Base class for callback. :param verbose: (int) """ def __init__(self, verbose: int = 0): super(BaseCallback, self).__init__() # The RL model self.model = None # type: Optional[BaseRLModel] # An alias for self.model.get_env(), the environment used for training self.training_env = None # type: Union[gym.Env, VecEnv, None] # Number of time the callback was called self.n_calls = 0 # type: int # n_envs * n times env.step() was called self.num_timesteps = 0 # type: int self.verbose = verbose self.locals = None # type: Optional[Dict[str, Any]] self.globals = None # type: Optional[Dict[str, Any]] self.logger = None # type: Optional[logger.Logger] # Sometimes, for event callback, it is useful # to have access to the parent object self.parent = None # type: Optional[BaseCallback] # Type hint as string to avoid circular import
[docs] def init_callback(self, model: 'BaseRLModel') -> None: """ Initialize the callback by saving references to the RL model and the training environment for convenience. """ self.model = model self.training_env = model.get_env() self.logger = logger.Logger.CURRENT self._init_callback()
[docs] def update_locals(self, locals_: Dict[str, Any]) -> None: """ Updates the local variables of the training process For reference to which variables are accessible, check each individual algorithm's documentation :param `locals_`: (Dict[str, Any]) current local variables """ self.locals.update(locals_)
def _init_callback(self) -> None: pass def on_training_start(self, locals_: Dict[str, Any], globals_: Dict[str, Any]) -> None: # Those are reference and will be updated automatically self.locals = locals_ self.globals = globals_ self._on_training_start() def _on_training_start(self) -> None: pass def on_rollout_start(self) -> None: self._on_rollout_start() def _on_rollout_start(self) -> None: pass def _on_step(self) -> bool: """ :return: (bool) If the callback returns False, training is aborted early. """ return True
[docs] def on_step(self) -> bool: """ This method will be called by the model after each call to `env.step()`. For child callback (of an `EventCallback`), this will be called when the event is triggered. :return: (bool) If the callback returns False, training is aborted early. """ self.n_calls += 1 self.num_timesteps = self.model.num_timesteps return self._on_step()
def on_training_end(self) -> None: self._on_training_end() def _on_training_end(self) -> None: pass def on_rollout_end(self) -> None: self._on_rollout_end() def _on_rollout_end(self) -> None: pass
[docs]class EventCallback(BaseCallback): """ Base class for triggering callback on event. :param callback: (Optional[BaseCallback]) Callback that will be called when an event is triggered. :param verbose: (int) """ def __init__(self, callback: Optional[BaseCallback] = None, verbose: int = 0): super(EventCallback, self).__init__(verbose=verbose) self.callback = callback # Give access to the parent if callback is not None: self.callback.parent = self
[docs] def init_callback(self, model: 'BaseRLModel') -> None: super(EventCallback, self).init_callback(model) if self.callback is not None: self.callback.init_callback(self.model)
def _on_training_start(self) -> None: if self.callback is not None: self.callback.on_training_start(self.locals, self.globals) def _on_event(self) -> bool: if self.callback is not None: return self.callback.on_step() return True def _on_step(self) -> bool: return True
[docs]class CallbackList(BaseCallback): """ Class for chaining callbacks. :param callbacks: (List[BaseCallback]) A list of callbacks that will be called sequentially. """ def __init__(self, callbacks: List[BaseCallback]): super(CallbackList, self).__init__() assert isinstance(callbacks, list) self.callbacks = callbacks def _init_callback(self) -> None: for callback in self.callbacks: callback.init_callback(self.model) def _on_training_start(self) -> None: for callback in self.callbacks: callback.on_training_start(self.locals, self.globals) def _on_rollout_start(self) -> None: for callback in self.callbacks: callback.on_rollout_start() def _on_step(self) -> bool: continue_training = True for callback in self.callbacks: # Return False (stop training) if at least one callback returns False continue_training = callback.on_step() and continue_training return continue_training def _on_rollout_end(self) -> None: for callback in self.callbacks: callback.on_rollout_end() def _on_training_end(self) -> None: for callback in self.callbacks: callback.on_training_end()
[docs]class CheckpointCallback(BaseCallback): """ Callback for saving a model every `save_freq` steps :param save_freq: (int) :param save_path: (str) Path to the folder where the model will be saved. :param name_prefix: (str) Common prefix to the saved models """ def __init__(self, save_freq: int, save_path: str, name_prefix='rl_model', verbose=0): super(CheckpointCallback, self).__init__(verbose) self.save_freq = save_freq self.save_path = save_path self.name_prefix = name_prefix def _init_callback(self) -> None: # Create folder if needed if self.save_path is not None: os.makedirs(self.save_path, exist_ok=True) def _on_step(self) -> bool: if self.n_calls % self.save_freq == 0: path = os.path.join(self.save_path, '{}_{}_steps'.format(self.name_prefix, self.num_timesteps)) self.model.save(path) if self.verbose > 1: print("Saving model checkpoint to {}".format(path)) return True
[docs]class ConvertCallback(BaseCallback): """ Convert functional callback (old-style) to object. :param callback: (Callable) :param verbose: (int) """ def __init__(self, callback, verbose=0): super(ConvertCallback, self).__init__(verbose) self.callback = callback def _on_step(self) -> bool: if self.callback is not None: return self.callback(self.locals, self.globals) return True
[docs]class EvalCallback(EventCallback): """ Callback for evaluating an agent. :param eval_env: (Union[gym.Env, VecEnv]) The environment used for initialization :param callback_on_new_best: (Optional[BaseCallback]) Callback to trigger when there is a new best model according to the `mean_reward` :param n_eval_episodes: (int) The number of episodes to test the agent :param eval_freq: (int) Evaluate the agent every eval_freq call of the callback. :param log_path: (str) Path to a folder where the evaluations (`evaluations.npz`) will be saved. It will be updated at each evaluation. :param best_model_save_path: (str) Path to a folder where the best model according to performance on the eval env will be saved. :param deterministic: (bool) Whether the evaluation should use a stochastic or deterministic actions. :param render: (bool) Whether to render or not the environment during evaluation :param verbose: (int) """ def __init__(self, eval_env: Union[gym.Env, VecEnv], callback_on_new_best: Optional[BaseCallback] = None, n_eval_episodes: int = 5, eval_freq: int = 10000, log_path: Optional[str] = None, best_model_save_path: Optional[str] = None, deterministic: bool = True, render: bool = False, verbose: int = 1): super(EvalCallback, self).__init__(callback_on_new_best, verbose=verbose) self.n_eval_episodes = n_eval_episodes self.eval_freq = eval_freq self.best_mean_reward = -np.inf self.last_mean_reward = -np.inf self.deterministic = deterministic self.render = render # Convert to VecEnv for consistency if not isinstance(eval_env, VecEnv): eval_env = DummyVecEnv([lambda: eval_env]) assert eval_env.num_envs == 1, "You must pass only one environment for evaluation" self.eval_env = eval_env self.best_model_save_path = best_model_save_path # Logs will be written in `evaluations.npz` if log_path is not None: log_path = os.path.join(log_path, 'evaluations') self.log_path = log_path self.evaluations_results = [] self.evaluations_timesteps = [] self.evaluations_length = [] def _init_callback(self): # Does not work in some corner cases, where the wrapper is not the same if not type(self.training_env) is type(self.eval_env): warnings.warn("Training and eval env are not of the same type" "{} != {}".format(self.training_env, self.eval_env)) # Create folders if needed if self.best_model_save_path is not None: os.makedirs(self.best_model_save_path, exist_ok=True) if self.log_path is not None: os.makedirs(os.path.dirname(self.log_path), exist_ok=True) def _on_step(self) -> bool: if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0: # Sync training and eval env if there is VecNormalize sync_envs_normalization(self.training_env, self.eval_env) episode_rewards, episode_lengths = evaluate_policy(self.model, self.eval_env, n_eval_episodes=self.n_eval_episodes, render=self.render, deterministic=self.deterministic, return_episode_rewards=True) if self.log_path is not None: self.evaluations_timesteps.append(self.num_timesteps) self.evaluations_results.append(episode_rewards) self.evaluations_length.append(episode_lengths) np.savez(self.log_path, timesteps=self.evaluations_timesteps, results=self.evaluations_results, ep_lengths=self.evaluations_length) mean_reward, std_reward = np.mean(episode_rewards), np.std(episode_rewards) mean_ep_length, std_ep_length = np.mean(episode_lengths), np.std(episode_lengths) # Keep track of the last evaluation, useful for classes that derive from this callback self.last_mean_reward = mean_reward if self.verbose > 0: print("Eval num_timesteps={}, " "episode_reward={:.2f} +/- {:.2f}".format(self.num_timesteps, mean_reward, std_reward)) print("Episode length: {:.2f} +/- {:.2f}".format(mean_ep_length, std_ep_length)) if mean_reward > self.best_mean_reward: if self.verbose > 0: print("New best mean reward!") if self.best_model_save_path is not None: self.model.save(os.path.join(self.best_model_save_path, 'best_model')) self.best_mean_reward = mean_reward # Trigger callback if needed if self.callback is not None: return self._on_event() return True
[docs]class StopTrainingOnRewardThreshold(BaseCallback): """ Stop the training once a threshold in episodic reward has been reached (i.e. when the model is good enough). It must be used with the `EvalCallback`. :param reward_threshold: (float) Minimum expected reward per episode to stop training. :param verbose: (int) """ def __init__(self, reward_threshold: float, verbose: int = 0): super(StopTrainingOnRewardThreshold, self).__init__(verbose=verbose) self.reward_threshold = reward_threshold def _on_step(self) -> bool: assert self.parent is not None, ("`StopTrainingOnRewardThreshold` callback must be used " "with an `EvalCallback`") # Convert np.bool to bool, otherwise callback.on_step() is False won't work continue_training = bool(self.parent.best_mean_reward < self.reward_threshold) if self.verbose > 0 and not continue_training: print("Stopping training because the mean reward {:.2f} " " is above the threshold {}".format(self.parent.best_mean_reward, self.reward_threshold)) return continue_training
[docs]class EveryNTimesteps(EventCallback): """ Trigger a callback every `n_steps` timesteps :param n_steps: (int) Number of timesteps between two trigger. :param callback: (BaseCallback) Callback that will be called when the event is triggered. """ def __init__(self, n_steps: int, callback: BaseCallback): super(EveryNTimesteps, self).__init__(callback) self.n_steps = n_steps self.last_time_trigger = 0 def _on_step(self) -> bool: if (self.num_timesteps - self.last_time_trigger) >= self.n_steps: self.last_time_trigger = self.num_timesteps return self._on_event() return True