Source code for stable_baselines.her.replay_buffer

import copy
from enum import Enum

import numpy as np

[docs]class GoalSelectionStrategy(Enum): """ The strategies for selecting new goals when creating artificial transitions. """ # Select a goal that was achieved # after the current step, in the same episode FUTURE = 0 # Select the goal that was achieved # at the end of the episode FINAL = 1 # Select a goal that was achieved in the episode EPISODE = 2 # Select a goal that was achieved # at some point in the training procedure # (and that is present in the replay buffer) RANDOM = 3
# For convenience # that way, we can use string to select a strategy KEY_TO_GOAL_STRATEGY = { 'future': GoalSelectionStrategy.FUTURE, 'final': GoalSelectionStrategy.FINAL, 'episode': GoalSelectionStrategy.EPISODE, 'random': GoalSelectionStrategy.RANDOM }
[docs]class HindsightExperienceReplayWrapper(object): """ Wrapper around a replay buffer in order to use HER. This implementation is inspired by to the one found in :param replay_buffer: (ReplayBuffer) :param n_sampled_goal: (int) The number of artificial transitions to generate for each actual transition :param goal_selection_strategy: (GoalSelectionStrategy) The method that will be used to generate the goals for the artificial transitions. :param wrapped_env: (HERGoalEnvWrapper) the GoalEnv wrapped using HERGoalEnvWrapper, that enables to convert observation to dict, and vice versa """ def __init__(self, replay_buffer, n_sampled_goal, goal_selection_strategy, wrapped_env): super(HindsightExperienceReplayWrapper, self).__init__() assert isinstance(goal_selection_strategy, GoalSelectionStrategy), "Invalid goal selection strategy," \ "please use one of {}".format( list(GoalSelectionStrategy)) self.n_sampled_goal = n_sampled_goal self.goal_selection_strategy = goal_selection_strategy self.env = wrapped_env # Buffer for storing transitions of the current episode self.episode_transitions = [] self.replay_buffer = replay_buffer
[docs] def add(self, obs_t, action, reward, obs_tp1, done): """ add a new transition to the buffer :param obs_t: (np.ndarray) the last observation :param action: ([float]) the action :param reward: (float) the reward of the transition :param obs_tp1: (np.ndarray) the new observation :param done: (bool) is the episode done """ assert self.replay_buffer is not None # Update current episode buffer self.episode_transitions.append((obs_t, action, reward, obs_tp1, done)) if done: # Add transitions (and imagined ones) to buffer only when an episode is over self._store_episode() # Reset episode buffer self.episode_transitions = []
def sample(self, *args, **kwargs): return self.replay_buffer.sample(*args, **kwargs)
[docs] def can_sample(self, n_samples): """ Check if n_samples samples can be sampled from the buffer. :param n_samples: (int) :return: (bool) """ return self.replay_buffer.can_sample(n_samples)
def __len__(self): return len(self.replay_buffer) def _sample_achieved_goal(self, episode_transitions, transition_idx): """ Sample an achieved goal according to the sampling strategy. :param episode_transitions: ([tuple]) a list of all the transitions in the current episode :param transition_idx: (int) the transition to start sampling from :return: (np.ndarray) an achieved goal """ if self.goal_selection_strategy == GoalSelectionStrategy.FUTURE: # Sample a goal that was observed in the same episode after the current step selected_idx = np.random.choice(np.arange(transition_idx + 1, len(episode_transitions))) selected_transition = episode_transitions[selected_idx] elif self.goal_selection_strategy == GoalSelectionStrategy.FINAL: # Choose the goal achieved at the end of the episode selected_transition = episode_transitions[-1] elif self.goal_selection_strategy == GoalSelectionStrategy.EPISODE: # Random goal achieved during the episode selected_idx = np.random.choice(np.arange(len(episode_transitions))) selected_transition = episode_transitions[selected_idx] elif self.goal_selection_strategy == GoalSelectionStrategy.RANDOM: # Random goal achieved, from the entire replay buffer selected_idx = np.random.choice(np.arange(len(self.replay_buffer))) selected_transition =[selected_idx] else: raise ValueError("Invalid goal selection strategy," "please use one of {}".format(list(GoalSelectionStrategy))) return self.env.convert_obs_to_dict(selected_transition[0])['achieved_goal'] def _sample_achieved_goals(self, episode_transitions, transition_idx): """ Sample a batch of achieved goals according to the sampling strategy. :param episode_transitions: ([tuple]) list of the transitions in the current episode :param transition_idx: (int) the transition to start sampling from :return: (np.ndarray) an achieved goal """ return [ self._sample_achieved_goal(episode_transitions, transition_idx) for _ in range(self.n_sampled_goal) ] def _store_episode(self): """ Sample artificial goals and store transition of the current episode in the replay buffer. This method is called only after each end of episode. """ # For each transition in the last episode, # create a set of artificial transitions for transition_idx, transition in enumerate(self.episode_transitions): obs_t, action, reward, obs_tp1, done = transition # Add to the replay buffer self.replay_buffer.add(obs_t, action, reward, obs_tp1, done) # We cannot sample a goal from the future in the last step of an episode if (transition_idx == len(self.episode_transitions) - 1 and self.goal_selection_strategy == GoalSelectionStrategy.FUTURE): break # Sampled n goals per transition, where n is `n_sampled_goal` # this is called k in the paper sampled_goals = self._sample_achieved_goals(self.episode_transitions, transition_idx) # For each sampled goals, store a new transition for goal in sampled_goals: # Copy transition to avoid modifying the original one obs, action, reward, next_obs, done = copy.deepcopy(transition) # Convert concatenated obs to dict, so we can update the goals obs_dict, next_obs_dict = map(self.env.convert_obs_to_dict, (obs, next_obs)) # Update the desired goal in the transition obs_dict['desired_goal'] = goal next_obs_dict['desired_goal'] = goal # Update the reward according to the new desired goal reward = self.env.compute_reward(goal, next_obs_dict['achieved_goal'], None) # Can we use achieved_goal == desired_goal? done = False # Transform back to ndarrays obs, next_obs = map(self.env.convert_dict_to_obs, (obs_dict, next_obs_dict)) # Add artificial transition to the replay buffer self.replay_buffer.add(obs, action, reward, next_obs, done)