PPO playing MountainCar-v0 from https://github.com/sgoodfriend/rl-algo-impls/tree/983cb75e43e51cf4ef57f177194ab9a4a1a8808b
3cc5c1d
| import numpy as np | |
| import torch | |
| from typing import NamedTuple, Sequence | |
| from rl_algo_impls.shared.policy.actor_critic import OnPolicy | |
| from rl_algo_impls.shared.trajectory import Trajectory | |
| from rl_algo_impls.wrappers.vectorable_wrapper import VecEnvObs | |
| class RtgAdvantage(NamedTuple): | |
| rewards_to_go: torch.Tensor | |
| advantage: torch.Tensor | |
| def discounted_cumsum(x: np.ndarray, gamma: float) -> np.ndarray: | |
| dc = x.copy() | |
| for i in reversed(range(len(x) - 1)): | |
| dc[i] += gamma * dc[i + 1] | |
| return dc | |
| def compute_advantage_from_trajectories( | |
| trajectories: Sequence[Trajectory], | |
| policy: OnPolicy, | |
| gamma: float, | |
| gae_lambda: float, | |
| device: torch.device, | |
| ) -> torch.Tensor: | |
| advantage = [] | |
| for traj in trajectories: | |
| last_val = 0 | |
| if not traj.terminated and traj.next_obs is not None: | |
| last_val = policy.value(traj.next_obs) | |
| rew = np.append(np.array(traj.rew), last_val) | |
| v = np.append(np.array(traj.v), last_val) | |
| deltas = rew[:-1] + gamma * v[1:] - v[:-1] | |
| advantage.append(discounted_cumsum(deltas, gamma * gae_lambda)) | |
| return torch.as_tensor( | |
| np.concatenate(advantage), dtype=torch.float32, device=device | |
| ) | |
| def compute_rtg_and_advantage_from_trajectories( | |
| trajectories: Sequence[Trajectory], | |
| policy: OnPolicy, | |
| gamma: float, | |
| gae_lambda: float, | |
| device: torch.device, | |
| ) -> RtgAdvantage: | |
| rewards_to_go = [] | |
| advantages = [] | |
| for traj in trajectories: | |
| last_val = 0 | |
| if not traj.terminated and traj.next_obs is not None: | |
| last_val = policy.value(traj.next_obs) | |
| rew = np.append(np.array(traj.rew), last_val) | |
| v = np.append(np.array(traj.v), last_val) | |
| deltas = rew[:-1] + gamma * v[1:] - v[:-1] | |
| adv = discounted_cumsum(deltas, gamma * gae_lambda) | |
| advantages.append(adv) | |
| rewards_to_go.append(v[:-1] + adv) | |
| return RtgAdvantage( | |
| torch.as_tensor( | |
| np.concatenate(rewards_to_go), dtype=torch.float32, device=device | |
| ), | |
| torch.as_tensor(np.concatenate(advantages), dtype=torch.float32, device=device), | |
| ) | |
| def compute_advantages( | |
| rewards: np.ndarray, | |
| values: np.ndarray, | |
| episode_starts: np.ndarray, | |
| next_episode_starts: np.ndarray, | |
| next_obs: VecEnvObs, | |
| policy: OnPolicy, | |
| gamma: float, | |
| gae_lambda: float, | |
| ) -> np.ndarray: | |
| advantages = np.zeros_like(rewards) | |
| last_gae_lam = 0 | |
| n_steps = advantages.shape[0] | |
| for t in reversed(range(n_steps)): | |
| if t == n_steps - 1: | |
| next_nonterminal = 1.0 - next_episode_starts | |
| next_value = policy.value(next_obs) | |
| else: | |
| next_nonterminal = 1.0 - episode_starts[t + 1] | |
| next_value = values[t + 1] | |
| delta = rewards[t] + gamma * next_value * next_nonterminal - values[t] | |
| last_gae_lam = delta + gamma * gae_lambda * next_nonterminal * last_gae_lam | |
| advantages[t] = last_gae_lam | |
| return advantages | |