Source code for genrl.core.rollout_storage

from typing import Generator, NamedTuple, Optional, Union

import gym
import numpy as np
import torch

from genrl.environments.vec_env import VecEnv


[docs]class RolloutBufferSamples(NamedTuple): observations: torch.Tensor actions: torch.Tensor old_values: torch.Tensor old_log_prob: torch.Tensor advantages: torch.Tensor returns: torch.Tensor
[docs]class ReplayBufferSamples(NamedTuple): observations: torch.Tensor actions: torch.Tensor next_observations: torch.Tensor dones: torch.Tensor rewards: torch.Tensor
[docs]class RolloutReturn(NamedTuple): episode_reward: float episode_timesteps: int n_episodes: int continue_training: bool
[docs]class BaseBuffer(object): """ Base class that represent a buffer (rollout or replay) :param buffer_size: (int) Max number of element in the buffer :param env: (Environment) The environment being trained on :param device: (Union[torch.device, str]) PyTorch device to which the values will be converted :param n_envs: (int) Number of parallel environments """ def __init__( self, buffer_size: int, env: Union[gym.Env, VecEnv], device: Union[torch.device, str] = "cpu", ): super(BaseBuffer, self).__init__() self.buffer_size = buffer_size self.env = env self.pos = 0 self.full = False self.device = device
[docs] @staticmethod def swap_and_flatten(arr: np.ndarray) -> np.ndarray: """ Swap and then flatten axes 0 (buffer_size) and 1 (n_envs) to convert shape from [n_steps, n_envs, ...] (when ... is the shape of the features) to [n_steps * n_envs, ...] (which maintain the order) :param arr: (np.ndarray) :return: (np.ndarray) """ shape = arr.shape if len(shape) < 3: shape = shape + (1,) return arr.swapaxes(0, 1).reshape(shape[0] * shape[1], *shape[2:])
[docs] def size(self) -> int: """ :return: (int) The current size of the buffer """ if self.full: return self.buffer_size return self.pos
[docs] def add(self, *args, **kwargs) -> None: """ Add elements to the buffer. """ raise NotImplementedError()
[docs] def extend(self, *args, **kwargs) -> None: """ Add a new batch of transitions to the buffer """ # Do a for loop along the batch axis for data in zip(*args): self.add(*data)
[docs] def reset(self) -> None: """ Reset the buffer. """ self.pos = 0 self.full = False
[docs] def sample( self, batch_size: int, ): """ :param batch_size: (int) Number of element to sample :return: (Union[RolloutBufferSamples, ReplayBufferSamples]) """ upper_bound = self.buffer_size if self.full else self.pos batch_inds = np.random.randint(0, upper_bound, size=batch_size) return self._get_samples(batch_inds)
def _get_samples( self, batch_inds: np.ndarray, ): """ :param batch_inds: (torch.Tensor) :return: (Union[RolloutBufferSamples, ReplayBufferSamples]) """ raise NotImplementedError()
[docs] def to_torch(self, array: np.ndarray, copy: bool = True) -> torch.Tensor: """ Convert a numpy array to a PyTorch tensor. Note: it copies the data by default :param array: (np.ndarray) :param copy: (bool) Whether to copy or not the data (may be useful to avoid changing things be reference) :return: (torch.Tensor) """ if copy: return torch.tensor(array).to(self.device) return torch.as_tensor(array).to(self.device)
[docs]class RolloutBuffer(BaseBuffer): """ Rollout buffer used in on-policy algorithms like A2C/PPO. :param buffer_size: (int) Max number of element in the buffer :param env: (Environment) The environment being trained on :param device: (torch.device) :param gae_lambda: (float) Factor for trade-off of bias vs variance for Generalized Advantage Estimator Equivalent to classic advantage when set to 1. :param gamma: (float) Discount factor :param n_envs: (int) Number of parallel environments """ def __init__( self, buffer_size: int, env: Union[gym.Env, VecEnv], device: Union[torch.device, str] = "cpu", gae_lambda: float = 1, gamma: float = 0.99, ): super(RolloutBuffer, self).__init__(buffer_size, env, device) self.gae_lambda = gae_lambda self.gamma = gamma self.observations, self.actions, self.rewards, self.advantages = ( None, None, None, None, ) self.returns, self.dones, self.values, self.log_probs = None, None, None, None self.generator_ready = False self.reset()
[docs] def reset(self) -> None: self.observations = np.zeros( ( self.buffer_size, self.env.n_envs, ) + self.env.obs_shape, dtype=np.float32, ) self.actions = np.zeros( ( self.buffer_size, self.env.n_envs, ) + self.env.action_shape, dtype=np.float32, ) self.rewards = np.zeros((self.buffer_size, self.env.n_envs), dtype=np.float32) self.returns = np.zeros((self.buffer_size, self.env.n_envs), dtype=np.float32) self.dones = np.zeros((self.buffer_size, self.env.n_envs), dtype=np.float32) self.values = np.zeros((self.buffer_size, self.env.n_envs), dtype=np.float32) self.log_probs = np.zeros((self.buffer_size, self.env.n_envs), dtype=np.float32) self.advantages = np.zeros( (self.buffer_size, self.env.n_envs), dtype=np.float32 ) self.generator_ready = False super(RolloutBuffer, self).reset()
[docs] def compute_returns_and_advantage( self, last_value: torch.Tensor, dones: np.ndarray, use_gae: bool = False ) -> None: """ Post-processing step: compute the returns (sum of discounted rewards) and advantage (A(s) = R - V(S)). Adapted from Stable-Baselines PPO2. :param last_value: (torch.Tensor) :param dones: (np.ndarray) :param use_gae: (bool) Whether to use Generalized Advantage Estimation or normal advantage for advantage computation. """ last_value = last_value.flatten() if use_gae: last_gae_lam = 0 for step in reversed(range(self.buffer_size)): if step == self.buffer_size - 1: next_non_terminal = 1.0 - dones next_value = last_value else: next_non_terminal = 1.0 - self.dones[step + 1] next_value = self.values[step + 1] delta = ( self.rewards[step] + self.gamma * next_value * next_non_terminal - self.values[step] ) last_gae_lam = ( delta + self.gamma * self.gae_lambda * next_non_terminal * last_gae_lam ) self.advantages[step] = last_gae_lam self.returns = self.advantages + self.values else: # Discounted return with value bootstrap # Note: this is equivalent to GAE computation # with gae_lambda = 1.0 last_return = 0.0 for step in reversed(range(self.buffer_size)): if step == self.buffer_size - 1: next_non_terminal = 1.0 - dones next_value = last_value last_return = self.rewards[step] + next_non_terminal * next_value else: next_non_terminal = 1.0 - self.dones[step + 1] last_return = ( self.rewards[step] + self.gamma * last_return * next_non_terminal ) self.returns[step] = last_return self.advantages = self.returns - self.values
[docs] def add( self, obs: np.ndarray, action: np.ndarray, reward: np.ndarray, done: np.ndarray, value: torch.Tensor, log_prob: torch.Tensor, ) -> None: """ :param obs: (np.ndarray) Observation :param action: (np.ndarray) Action :param reward: (np.ndarray) :param done: (np.ndarray) End of episode signal. :param value: (torch.Tensor) estimated value of the current state following the current policy. :param log_prob: (torch.Tensor) log probability of the action following the current policy. """ if len(log_prob.shape) == 0: # Reshape 0-d tensor to avoid error log_prob = log_prob.reshape(-1, 1) self.observations[self.pos] = np.array(obs).copy() self.actions[self.pos] = np.array(action).copy() self.rewards[self.pos] = np.array(reward).copy() self.dones[self.pos] = np.array(done).copy() self.values[self.pos] = value.clone().cpu().numpy().flatten() self.log_probs[self.pos] = log_prob.clone().cpu().numpy().flatten() self.pos += 1 if self.pos == self.buffer_size: self.full = True
[docs] def get( self, batch_size: Optional[int] = None ) -> Generator[RolloutBufferSamples, None, None]: assert self.full, "" indices = np.random.permutation(self.buffer_size * self.env.n_envs) # Prepare the data if not self.generator_ready: for tensor in [ "observations", "actions", "values", "log_probs", "advantages", "returns", ]: self.__dict__[tensor] = self.swap_and_flatten(self.__dict__[tensor]) self.generator_ready = True # Return everything, don't create minibatches if batch_size is None: batch_size = self.buffer_size * self.env.n_envs start_idx = 0 while start_idx < self.buffer_size * self.env.n_envs: yield self._get_samples(indices[start_idx : start_idx + batch_size]) start_idx += batch_size
def _get_samples(self, batch_inds: np.ndarray) -> RolloutBufferSamples: data = ( self.observations[batch_inds], self.actions[batch_inds], self.values[batch_inds].flatten(), self.log_probs[batch_inds].flatten(), self.advantages[batch_inds].flatten(), self.returns[batch_inds].flatten(), ) return RolloutBufferSamples(*tuple(map(self.to_torch, data)))