# Applying policy gradient to OpenAI Gym classic control problems with Pytorch

Marton Trencseni - Tue 12 November 2019 - Machine Learning

## Introduction

In the previous blog post we used a simple Reinforcement Learning method called policy gradient to solve the CartPole-v1 environment from OpenAI. This post is about seeing how far I can take this basic approach. Can it solve the other, harder classic control problems in OpenAI?

The OpenAI classic control problem set consists of:

## Environments

All these problems are similar in that the state space (which is the input space for the policy neural network) is a few real numbers. The action space (which is the output space for the policy) is sometimes discrete (left/right) and sometimes a real (magnitude): env CartPole-v1 Acrobot-v1 Pendulum-v0 MountainCar-v0 MountainCarContinuous-v0 description balance a pole on a cart swing up a two-link robot swing up a pendulum drive up a big hill drive up a big hill code cartpole.py acrobot.py pendulum.py mountain_car.py continuous_mountain_car.py state 4 reals: cart position, velocity, pole angle, velocity 6 reals: sine and cosine of the two rotational joint angles and the joint angular velocities 3 reals: sine and cosine of the angle and angular velocity 2 reals: position, velocity 2 reals: position, velocity action discrete: left/right discrete: left/nothing/right, the torque on the second joint 1 real between -2 and 2, the torque discrete: left/nothing/right, the force on the car 1 real between -1 and 1, the force on the car episode length 500 500 200 200 999 reward +1 for each timestep the agent stays alive -1 for each timestep the agent takes to swing up negative reward as a function of the angle -1 for each timestep the agent doesn’t reach the top of the hill negative for applied action, +100 once solved reward threshold for solved 475 -100 None (I used -150) -110 90

## Coding it up

To attack all the problems with one script, I took the script from the previous post and made it more general. First, a function which returns the input and output space dimensions:

def in_out_length(env):
input_length = env.observation_space.shape
if isinstance(env.action_space, gym.spaces.box.Box):
output_length = env.action_space.shape
elif isinstance(env.action_space, gym.spaces.discrete.Discrete):
output_length = env.action_space.n
return input_length, output_length


For example, for CartPole, it returns (4, 2). The next question is, how to attack environments such as Pendulum, where the action is real and not discrete? The Policy gradient formalism uses probabilities, so I chose to quantize the action space and pretend it’s discrete. This makes the quantization levels a parameter. I also wanted to play around with different reward functions when constructing the loss function, so I collected these into an EnvDescriptor:

fields = [
'name',
'model_class',
# the rest are optional, in case it's needed for the env
'reward_func',
'output_quantization_levels',
'output_range',
'reward_threshold',
]
EnvDescription = collections.namedtuple('EnvDescription', fields, defaults=[None] * len(fields))


The descriptors for the problems are:

env_descriptions = [
EnvDescription(name='CartPole-v1',
model_class=PolicyShallowNN,
),
EnvDescription(name='Acrobot-v1',
model_class=PolicyShallowNN,
),
EnvDescription(name='Pendulum-v0',
model_class=PolicyDeepNN,
reward_func=pendelum_reward_func,
output_quantization_levels=101,
output_range=[-2, 2],
reward_threshold=-150,
),
EnvDescription(name='MountainCar-v0',
model_class=PolicyDeepNN,
reward_func=mountaincar_reward_func
),
EnvDescription(name='MountainCarContinuous-v0',
model_class=PolicyDeepNN,
reward_func=mountaincar_reward_func,
output_quantization_levels=101,
output_range=[-1, 1],
),
]


I defined two policy neural networks: one which is a simple affine linear map, and one with one hidden layer network and ReLu for nonlinearity. Both end with softmax to get a probability distribution over the discrete (or discretized) action space, as before:

class PolicyShallowNN(nn.Module):
def __init__(self, input_length, output_length):
super(PolicyShallowNN, self).__init__()
self.fc = nn.Linear(input_length, output_length)

def forward(self, x):
x = self.fc(x)
return F.softmax(x, dim=1)

class PolicyDeepNN(nn.Module):
def __init__(self, input_length, output_length):
super(PolicyDeepNN, self).__init__()
self.fc1 = nn.Linear(input_length, 16)
self.fc2 = nn.Linear(16, 16)
self.fc3 = nn.Linear(16, output_length)

def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
x = F.relu(x)
x = self.fc3(x)
return F.softmax(x, dim=1)


With these generalizations, we use plain-vannila policy descent:

• for each episode
• finish the episode
• if the descriptor contains a custom reward function, use that, otherwise use the env’s default reward function to compute rewards, which are then rolled up with a gamma factor and multiplied by -1 to get the loss function (value)
• using gamma = 0.99
• every 10 episodes, use the Adam optimizer to improve the weights on the policy, by backprop’ing from the added up losses (of 10 episodes)
• check whether the avg reward of the last 10 episodes if greater than the threshold (always use the env’s rewards here), if yes, we’re done

Computing the loss function:

def compute_loss(probs, rewards, states=None, reward_func=None, gamma=0.99):
if reward_func is not None:
rewards = reward_func(rewards, states)
scaled_rewards = []
# decay rewards with gamma
tail = 0
for reward in rewards[::-1]: # backward
tail = reward + gamma * tail
scaled_rewards.insert(0, tail) # insert at beginning
loss = 0
for p, r in zip(probs, scaled_rewards):
loss += p * r
loss *= -1
return loss


The main training loop:

def train_solve(env_desc, num_episodes=100*1000):
print('Doing %s' % env_desc.name)
env = gym.make(env_desc.name)
input_length, output_length = in_out_length(env)
if env_desc.output_quantization_levels is not None:
output_length *= env_desc.output_quantization_levels
model = env_desc.model_class(input_length, output_length)
episode_rewards = []
loss = 0
for episode in range(num_episodes):
if episode > 0: print('.', end=('' if bool(episode % 100) else '\n'))
probs, rewards, states = [], [], []
state = env.reset()
for t in range(env._max_episode_steps):
action, prob = select_action_from_policy(model, state)
if env_desc.output_quantization_levels is not None:
action = env_desc.output_range + action * float(env_desc.output_range - env_desc.output_range) / (env_desc.output_quantization_levels-1)
action = np.array([action])
state, reward, done, _ = env.step(action)
probs.append(prob)
rewards.append(reward)
states.append(state.copy())
if done:
break
episode_reward = sum(rewards)
episode_rewards.append(episode_reward)
if is_solved(env, episode_rewards, reward_threshold(env, env_desc)):
print('\nSolved in %s episodes!' % (episode+1))
return env, model
loss += compute_loss(probs, rewards, states, env_desc.reward_func)
if episode > 0 and episode % 10 == 0:

This approach is able to solve CartPole-v1, Acrobot-v1, but fails on the Pendulum and MountainCar problems. The optimizer gets stuck in a bad minima and never finds a good policy solution. An interesting bad behavior happens for positive loss functions: the loss function is structured like loss := -1 * sum(reward x log(probability of action taken)), where the log(probability of action taken) is negative, so the overall expression is positive, assuming the reward is positive. In this case, making the loss 0 would be a global minimum. This can happen if the optimizer sets the probabilities of an arbitrary sub-optimal policy to one, hence making the log(probability) zero, making the entire loss function go to zero, even though the solution is actually "random".