import random
from typing import Any, List, Tuple, Union

import gym
import numpy as np
import torch  # noqa
import torch.nn as nn  # noqa

from genrl.core.base import BaseActorCritic, BasePolicy, BaseValue
from genrl.core.noise import NoisyLinear
from genrl.environments.vec_env import VecEnv

[docs]def get_model(type_: str, name_: str) -> Union: """ Utility to get the class of required function :param type_: "ac" for Actor Critic, "v" for Value, "p" for Policy :param name_: Name of the specific structure of model. ( Eg. "mlp" or "cnn") :type type_: string :returns: Required class. Eg. MlpActorCritic """ if type_ == "ac": from genrl.core import get_actor_critic_from_name return get_actor_critic_from_name(name_) elif type_ == "v": from genrl.core import get_value_from_name return get_value_from_name(name_) elif type_ == "p": from genrl.core import get_policy_from_name return get_policy_from_name(name_) raise ValueError
[docs]def mlp( sizes: Tuple, activation: str = "relu", sac: bool = False, ): """ Generates an MLP model given sizes of each layer :param sizes: Sizes of hidden layers :param sac: True if Soft Actor Critic is being used, else False :type sizes: tuple or list :type sac: bool :returns: (Neural Network with fully-connected linear layers and activation layers) """ layers = [] limit = len(sizes) if sac is False else len(sizes) - 1 activation = nn.Tanh() if activation == "tanh" else nn.ReLU() for layer in range(limit - 1): act = activation if layer < limit - 2 else nn.Identity() layers += [nn.Linear(sizes[layer], sizes[layer + 1]), act] return nn.Sequential(*layers)
[docs]def cnn( channels: Tuple = (4, 16, 32), kernel_sizes: Tuple = (8, 4), strides: Tuple = (4, 2), **kwargs, ) -> (Tuple): """ (Generates a CNN model given input dimensions, channels, kernel_sizes and strides) :param channels: Input output channels before and after each convolution :param kernel_sizes: Kernel sizes for each convolution :param strides: Strides for each convolution :param in_size: Input dimensions (assuming square input) :type channels: tuple :type kernel_sizes: tuple :type strides: tuple :type in_size: int :returns: (Convolutional Neural Network with convolutional layers and activation layers) """ cnn_layers = [] output_size = kwargs["in_size"] if "in_size" in kwargs else 84 act_fn = kwargs["activation"] if "activation" in kwargs else "relu" activation = nn.Tanh() if act_fn == "tanh" else nn.ReLU() for i in range(len(channels) - 1): in_channels, out_channels = channels[i], channels[i + 1] kernel_size, stride = kernel_sizes[i], strides[i] conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride) cnn_layers += [conv, activation] output_size = (output_size - kernel_size) / stride + 1 cnn_layers = nn.Sequential(*cnn_layers) output_size = int(out_channels * (output_size ** 2)) return cnn_layers, output_size
[docs]def noisy_mlp(fc_layers: List[int], noisy_layers: List[int], activation="relu"): """Noisy MLP generating helper function Args: fc_layers (:obj:`list` of :obj:`int`): List of fully connected layers noisy_layers (:obj:`list` of :obj:`int`): :ist of noisy layers activation (str): Activation function to be used. ["tanh", "relu"] Returns: Noisy MLP model """ model = [] act = nn.Tanh if activation == "tanh" else nn.ReLU() for layer in range(len(fc_layers) - 1): model += [nn.Linear(fc_layers[layer], fc_layers[layer + 1]), act] model += [nn.Linear(fc_layers[-1], noisy_layers[0]), act] for layer in range(len(noisy_layers) - 1): model += [NoisyLinear(noisy_layers[layer], noisy_layers[layer + 1])] if layer < len(noisy_layers) - 2: model += [act] return nn.Sequential(*model)
[docs]def get_env_properties( env: Union[gym.Env, VecEnv], network: Union[str, Any] = "mlp" ) -> (Tuple[int]): """ Finds important properties of environment :param env: Environment that the agent is interacting with :type env: Gym Environment :param network: Type of network architecture, eg. "mlp", "cnn" :type network: str :returns: (State space dimensions, Action space dimensions, discreteness of action space and action limit (highest action value) :rtype: int, float, ...; int, float, ...; bool; int, float, ... """ if network == "cnn": state_dim = env.framestack elif network == "mlp": state_dim = env.observation_space.shape[0] elif isinstance(network, (BasePolicy, BaseValue)): state_dim = network.state_dim elif isinstance(network, BaseActorCritic): state_dim = else: raise TypeError if isinstance(env.action_space, gym.spaces.Discrete): action_dim = env.action_space.n discrete = True action_lim = None elif isinstance(env.action_space, gym.spaces.Box): action_dim = env.action_space.shape[0] action_lim = env.action_space.high[0] discrete = False else: raise NotImplementedError return state_dim, action_dim, discrete, action_lim
[docs]def set_seeds(seed: int, env: Union[gym.Env, VecEnv] = None) -> None: """ Sets seeds for reproducibility :param seed: Seed Value :param env: Optionally pass gym environment to set its seed :type seed: int :type env: Gym Environment """ torch.manual_seed(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False np.random.seed(seed) random.seed(seed) if env is not None: env.seed(seed)
[docs]def safe_mean(log: Union[torch.Tensor, List[int]]): """ Returns 0 if there are no elements in logs """ if len(log) == 0: return 0 if isinstance(log, torch.Tensor): func = torch.mean else: func = np.mean return func(log)