Source code for genrl.utils.utils

import random
from typing import Any, List, Tuple, Union

import gym
import numpy as np
import torch
from torch import nn as nn

from genrl.core.base import BaseActorCritic, BasePolicy, BaseValue
from genrl.core.noise import NoisyLinear
from genrl.environments.vec_env import VecEnv


[docs]def get_model(type_: str, name_: str) -> Union: """ Utility to get the class of required function :param type_: "ac" for Actor Critic, "v" for Value, "p" for Policy :param name_: Name of the specific structure of model. ( Eg. "mlp" or "cnn") :type type_: string :returns: Required class. Eg. MlpActorCritic """ if type_ == "ac": from genrl.core import get_actor_critic_from_name return get_actor_critic_from_name(name_) elif type_ == "v": from genrl.core import get_value_from_name return get_value_from_name(name_) elif type_ == "p": from genrl.core import get_policy_from_name return get_policy_from_name(name_) raise ValueError
[docs]def mlp( sizes: Tuple, activation: str = "relu", sac: bool = False, ): """ Generates an MLP model given sizes of each layer :param sizes: Sizes of hidden layers :param sac: True if Soft Actor Critic is being used, else False :type sizes: tuple or list :type sac: bool :returns: (Neural Network with fully-connected linear layers and activation layers) """ layers = [] limit = len(sizes) if sac is False else len(sizes) - 1 activation = nn.Tanh() if activation == "tanh" else nn.ReLU() for layer in range(limit - 1): act = activation if layer < limit - 2 else nn.Identity() layers += [nn.Linear(sizes[layer], sizes[layer + 1]), act] return nn.Sequential(*layers)
[docs]def cnn( channels: Tuple = (4, 16, 32), kernel_sizes: Tuple = (8, 4), strides: Tuple = (4, 2), **kwargs, ) -> (Tuple): """ (Generates a CNN model given input dimensions, channels, kernel_sizes and strides) :param channels: Input output channels before and after each convolution :param kernel_sizes: Kernel sizes for each convolution :param strides: Strides for each convolution :param in_size: Input dimensions (assuming square input) :type channels: tuple :type kernel_sizes: tuple :type strides: tuple :type in_size: int :returns: (Convolutional Neural Network with convolutional layers and activation layers) """ cnn_layers = [] output_size = kwargs["in_size"] if "in_size" in kwargs else 84 act_fn = kwargs["activation"] if "activation" in kwargs else "relu" activation = nn.Tanh() if act_fn == "tanh" else nn.ReLU() for i in range(len(channels) - 1): in_channels, out_channels = channels[i], channels[i + 1] kernel_size, stride = kernel_sizes[i], strides[i] conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride) cnn_layers += [conv, activation] output_size = (output_size - kernel_size) / stride + 1 cnn_layers = nn.Sequential(*cnn_layers) output_size = int(out_channels * (output_size ** 2)) return cnn_layers, output_size
[docs]def noisy_mlp(fc_layers: List[int], noisy_layers: List[int], activation="relu"): """Noisy MLP generating helper function Args: fc_layers (:obj:`list` of :obj:`int`): List of fully connected layers noisy_layers (:obj:`list` of :obj:`int`): :ist of noisy layers activation (str): Activation function to be used. ["tanh", "relu"] Returns: Noisy MLP model """ model = [] act = nn.Tanh if activation == "tanh" else nn.ReLU() for layer in range(len(fc_layers) - 1): model += [nn.Linear(fc_layers[layer], fc_layers[layer + 1]), act] model += [nn.Linear(fc_layers[-1], noisy_layers[0]), act] for layer in range(len(noisy_layers) - 1): model += [NoisyLinear(noisy_layers[layer], noisy_layers[layer + 1])] if layer < len(noisy_layers) - 2: model += [act] return nn.Sequential(*model)
[docs]def get_env_properties( env: Union[gym.Env, VecEnv], network: Union[str, Any] = "mlp" ) -> (Tuple[int]): """ Finds important properties of environment :param env: Environment that the agent is interacting with :type env: Gym Environment :param network: Type of network architecture, eg. "mlp", "cnn" :type network: str :returns: (State space dimensions, Action space dimensions, discreteness of action space and action limit (highest action value) :rtype: int, float, ...; int, float, ...; bool; int, float, ... """ if network == "cnn": state_dim = env.framestack elif network == "mlp": state_dim = env.observation_space.shape[0] elif isinstance(network, (BasePolicy, BaseValue)): state_dim = network.state_dim elif isinstance(network, BaseActorCritic): state_dim = network.actor.state_dim else: raise TypeError if isinstance(env.action_space, gym.spaces.Discrete): action_dim = env.action_space.n discrete = True action_lim = None elif isinstance(env.action_space, gym.spaces.Box): action_dim = env.action_space.shape[0] action_lim = env.action_space.high[0] discrete = False else: raise NotImplementedError return state_dim, action_dim, discrete, action_lim
[docs]def set_seeds(seed: int, env: Union[gym.Env, VecEnv] = None) -> None: """ Sets seeds for reproducibility :param seed: Seed Value :param env: Optionally pass gym environment to set its seed :type seed: int :type env: Gym Environment """ torch.manual_seed(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False np.random.seed(seed) random.seed(seed) if env is not None: env.seed(seed)
[docs]def safe_mean(log: List[int]): """ Returns 0 if there are no elements in logs """ return np.mean(log) if len(log) > 0 else 0