强化学习(DQN)教程

作者Adam Paszke

译者wutong Zhang

校验:wutong Zhang

本教程介绍了如何使用PyTorch训练一个Deep Q-learning(DQN)智能点(Agent)来完成OpenAI Gym中的CartPole-V0任务。

任务

智能点需要决定两种动作:向左或向右来使其上的杆保持直立。 你可以在OpenAI Gym找到一个有各种算法和可视化的官方排行榜。

cartpole

当智能点观察环境的当前状态并选择动作时,环境将转换为新状态,并返回指示动作结果的奖励。在这项任务中,每增加一个时间步,奖励+1,如果杆子掉得太远或大车移动距离中心超过2.4个单位,环境就会终止。这意味着更好的执行场景将持续更长的时间,积累更大的回报。

Cartpole任务的设计为智能点输入代表环境状态(位置、速度等)的4个实际值。然而,神经网络完全可以通过观察场景来解决这个任务,所以我们将使用以车为中心的一块屏幕作为输入。因此,我们的结果无法直接与官方排行榜上的结果相比——我们的任务更艰巨。不幸的是,这会减慢训练速度,因为我们必须渲染所有帧。

严格地说,我们将以当前帧和前一个帧之间的差异来呈现状态。这将允许代理从一张图像中考虑杆子的速度。

首先你需要导入必须的包。我们需要 gym 作为环境 (使用 pip install gym 安装). 我们也需要 PyTorch 的如下功能:

  • 神经网络(torch.nn
  • 优化(torch.optim
  • 自动微分(torch.autograd
  • 对于视觉任务工具(torchvision- 一个单独的包
  1. import gym
  2. import math
  3. import random
  4. import numpy as np
  5. import matplotlib
  6. import matplotlib.pyplot as plt
  7. from collections import namedtuple
  8. from itertools import count
  9. from PIL import Image
  10. import torch
  11. import torch.nn as nn
  12. import torch.optim as optim
  13. import torch.nn.functional as F
  14. import torchvision.transforms as T
  15. env = gym.make('CartPole-v0').unwrapped
  16. # set up matplotlib
  17. is_ipython = 'inline' in matplotlib.get_backend()
  18. if is_ipython:
  19. from IPython import display
  20. plt.ion()
  21. # if gpu is to be used
  22. device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

回放内存

我们将使用经验回放内存来训练DQN。它存储智能点观察到的转换,允许我们稍后重用此数据。通过从中随机抽样,组成批对象的转换将被取消相关性。结果表明,这大大稳定和改进了DQN训练过程。

因此,我们需要两个类别:

  • Transition- 一个命名的元组,表示我们环境中的单个转换。它基本上将(状态、动作)对映射到它们的(下一个状态、奖励)结果,状态是屏幕差分图像,如后面所述。
  • ReplayMemory- 一个有界大小的循环缓冲区,用于保存最近观察到的转换。它还实现了一个.sample()方法,用于选择一批随机转换进行训练。
  1. Transition = namedtuple('Transition',
  2. ('state', 'action', 'next_state', 'reward'))
  3. class ReplayMemory(object):
  4. def __init__(self, capacity):
  5. self.capacity = capacity
  6. self.memory = []
  7. self.position = 0
  8. def push(self, *args):
  9. """Saves a transition."""
  10. if len(self.memory) < self.capacity:
  11. self.memory.append(None)
  12. self.memory[self.position] = Transition(*args)
  13. self.position = (self.position + 1) % self.capacity
  14. def sample(self, batch_size):
  15. return random.sample(self.memory, batch_size)
  16. def __len__(self):
  17. return len(self.memory)

现在我们来定义自己的模型。但首先来快速了解一下DQN。

DQN算法

我们的环境是确定的,所以这里提出的所有方程也都是确定性的,为了简单起见。在强化学习文献中,它们还包含对环境中随机转换的期望。

我们的目标是制定一项策略,试图最大化折扣、累积奖励

强化学习(DQN)教程 - 图2

,其中

强化学习(DQN)教程 - 图3

也被认为是返回值。 折扣,

强化学习(DQN)教程 - 图4

应该是介于

强化学习(DQN)教程 - 图5

强化学习(DQN)教程 - 图6

之间的常量,以确保和收敛。它使来自不确定的遥远未来的回报对我们的智能点来说比它在不久的将来相当有信心的回报更不重要。

Q-Learning背后的主要思想是,如果我们有一个函数

强化学习(DQN)教程 - 图7

,则如果我们在特定的状态下采取行动,那么我们可以很容易地构建一个最大化回报的策略:

强化学习(DQN)教程 - 图8

然而,我们并不了解世界的一切,因此我们无法访问

强化学习(DQN)教程 - 图9

。但是,由于神经网络是通用的函数逼近器,我们可以简单地创建一个并训练它类似于

强化学习(DQN)教程 - 图10

对对于我们的训练更新规则,我们将假设某些策略的每个

强化学习(DQN)教程 - 图11

函数都遵循Bellman方程:

强化学习(DQN)教程 - 图12

等式两边的差异被称为时间差误差,即

强化学习(DQN)教程 - 图13

强化学习(DQN)教程 - 图14

为了尽量减少这个错误,我们将使用Huber loss。Huber损失在误差很小的情况下表现为均方误差,但在误差较大的情况下表现为平均绝对误差 —— 这使得当对

强化学习(DQN)教程 - 图15

的估计噪音很大时,对异常值的鲁棒性更强。我们通过从重放内存中取样的一批转换来计算

强化学习(DQN)教程 - 图16

强化学习(DQN)教程 - 图17

强化学习(DQN)教程 - 图18

Q-网络

我们的模型将是一个卷积神经网络需要在当前和以前的屏幕补丁之间的差异。它具有两个输出端,表示

强化学习(DQN)教程 - 图19

强化学习(DQN)教程 - 图20

(其中

强化学习(DQN)教程 - 图21

是输入到网络)。实际上,网络正试图预测在给定电流输入的情况下采取每项行动的预期回报。

  1. class DQN(nn.Module):
  2. def __init__(self, h, w, outputs):
  3. super(DQN, self).__init__()
  4. self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
  5. self.bn1 = nn.BatchNorm2d(16)
  6. self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
  7. self.bn2 = nn.BatchNorm2d(32)
  8. self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
  9. self.bn3 = nn.BatchNorm2d(32)
  10. # 线性输入连接的数量取决于conv2d层的输出,因此需要计算输入图像的大小。
  11. def conv2d_size_out(size, kernel_size = 5, stride = 2):
  12. return (size - (kernel_size - 1) - 1) // stride + 1
  13. convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
  14. convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
  15. linear_input_size = convw * convh * 32
  16. self.head = nn.Linear(linear_input_size, outputs)
  17. # 使用一个元素调用以确定下一个操作,或在优化期间调用批处理。返回张量
  18. def forward(self, x):
  19. x = F.relu(self.bn1(self.conv1(x)))
  20. x = F.relu(self.bn2(self.conv2(x)))
  21. x = F.relu(self.bn3(self.conv3(x)))
  22. return self.head(x.view(x.size(0), -1))

获取输入

下面的代码是用于从环境中提取和处理渲染图像的实用程序。它使用了torchvision包,这样就可以很容易地组合图像转换。运行单元后,它将显示它提取的示例帧。

  1. resize = T.Compose([T.ToPILImage(),
  2. T.Resize(40, interpolation=Image.CUBIC),
  3. T.ToTensor()])
  4. def get_cart_location(screen_width):
  5. world_width = env.x_threshold * 2
  6. scale = screen_width / world_width
  7. return int(env.state[0] * scale + screen_width / 2.0) # MIDDLE OF CART
  8. def get_screen():
  9. # 返回 gym 需要的400x600x3 图片, 但有时会更大,如800x1200x3. 将其转换为torch (CHW).
  10. screen = env.render(mode='rgb_array').transpose((2, 0, 1))
  11. # 车子在下半部分,因此请剥去屏幕的顶部和底部。
  12. screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)]
  13. view_width = int(screen_width * 0.6)
  14. cart_location = get_cart_location(screen_width)
  15. if cart_location < view_width // 2:
  16. slice_range = slice(view_width)
  17. elif cart_location > (screen_width - view_width // 2):
  18. slice_range = slice(-view_width, None)
  19. else:
  20. slice_range = slice(cart_location - view_width // 2,
  21. cart_location + view_width // 2)
  22. # 去掉边缘,这样我们就可以得到一个以车为中心的正方形图像。
  23. screen = screen[:, :, slice_range]
  24. # 转化为 float, 重新裁剪, 转化为 torch 张量(这并不需要拷贝)
  25. screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
  26. screen = torch.from_numpy(screen)
  27. # 重新裁剪,加入批维度 (BCHW)
  28. return resize(screen).unsqueeze(0).to(device)
  29. env.reset()
  30. plt.figure()
  31. plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(),
  32. interpolation='none')
  33. plt.title('Example extracted screen')
  34. plt.show()

训练

超参数和配置

此单元实例化模型及其优化器,并定义一些实用程序:

  • select_action- 将根据迭代次数贪婪策略选择一个行动。简单地说,我们有时会使用我们的模型来选择动作,有时我们只会对其中一个进行统一的采样。选择随机动作的概率将从 EPS_START 开始并以指数形式向 EPS_END衰减。 EPS_DECAY 控制衰减速率。
  • plot_durations- 一个帮助绘制迭代次数持续时间,以及过去100迭代次数的平均值(官方评估中使用的度量)。迭代次数将在包含主训练循环的单元下方,并在每迭代之后更新。
  1. BATCH_SIZE = 128
  2. GAMMA = 0.999
  3. EPS_START = 0.9
  4. EPS_END = 0.05
  5. EPS_DECAY = 200
  6. TARGET_UPDATE = 10
  7. # 获取屏幕大小,以便我们可以根据从ai-gym返回的形状正确初始化层。
  8. # 这一点上的典型尺寸接近3x40x90,这是在get_screen()中抑制和缩小的渲染缓冲区的结果。
  9. init_screen = get_screen()
  10. _, _, screen_height, screen_width = init_screen.shape
  11. # Get number of actions from gym action space
  12. n_actions = env.action_space.n
  13. policy_net = DQN(screen_height, screen_width, n_actions).to(device)
  14. target_net = DQN(screen_height, screen_width, n_actions).to(device)
  15. target_net.load_state_dict(policy_net.state_dict())
  16. target_net.eval()
  17. optimizer = optim.RMSprop(policy_net.parameters())
  18. memory = ReplayMemory(10000)
  19. steps_done = 0
  20. def select_action(state):
  21. global steps_done
  22. sample = random.random()
  23. eps_threshold = EPS_END + (EPS_START - EPS_END) * \
  24. math.exp(-1. * steps_done / EPS_DECAY)
  25. steps_done += 1
  26. if sample > eps_threshold:
  27. with torch.no_grad():
  28. # t.max(1) will return largest column value of each row.
  29. # second column on max result is index of where max element was
  30. # found, so we pick action with the larger expected reward.
  31. return policy_net(state).max(1)[1].view(1, 1)
  32. else:
  33. return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)
  34. episode_durations = []
  35. def plot_durations():
  36. plt.figure(2)
  37. plt.clf()
  38. durations_t = torch.tensor(episode_durations, dtype=torch.float)
  39. plt.title('Training...')
  40. plt.xlabel('Episode')
  41. plt.ylabel('Duration')
  42. plt.plot(durations_t.numpy())
  43. # Take 100 episode averages and plot them too
  44. if len(durations_t) >= 100:
  45. means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
  46. means = torch.cat((torch.zeros(99), means))
  47. plt.plot(means.numpy())
  48. plt.pause(0.001) # pause a bit so that plots are updated
  49. if is_ipython:
  50. display.clear_output(wait=True)
  51. display.display(plt.gcf())

训练循环

最后,训练我们的模型的代码。

在这里,你可以找到执行最优化的一个步骤的optimize_model功能。它执行优化的一个步骤。它首先对一批数据进行采样,将所有张量连接成一个张量,计算出

强化学习(DQN)教程 - 图22

强化学习(DQN)教程 - 图23

,并将它们组合成我们的损失。根据定义,如果

强化学习(DQN)教程 - 图24

是结束状态,我们设置

强化学习(DQN)教程 - 图25

。我们还使用目标网络来计算

强化学习(DQN)教程 - 图26

以增加稳定性。目标网络的权重大部分时间保持不变,但每隔一段时间就会更新一次策略网络的权重。这通常是一组步骤,但为了简单起见,我们将使用迭代次数。

  1. def optimize_model():
  2. if len(memory) < BATCH_SIZE:
  3. return
  4. transitions = memory.sample(BATCH_SIZE)
  5. # 转置批样本(有关详细说明,请参阅https://stackoverflow.com/a/19343/3343043)。这会将转换的批处理数组转换为批处理数组的转换。
  6. batch = Transition(*zip(*transitions))
  7. # 计算非最终状态的掩码并连接批处理元素(最终状态将是模拟结束后的状态)
  8. non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
  9. batch.next_state)), device=device, dtype=torch.uint8)
  10. non_final_next_states = torch.cat([s for s in batch.next_state
  11. if s is not None])
  12. state_batch = torch.cat(batch.state)
  13. action_batch = torch.cat(batch.action)
  14. reward_batch = torch.cat(batch.reward)
  15. # 计算Q(s_t, a)-模型计算 Q(s_t),然后选择所采取行动的列。这些是根据策略网络对每个批处理状态所采取的操作。
  16. state_action_values = policy_net(state_batch).gather(1, action_batch)
  17. # 计算下一个状态的V(s_{t+1})。非最终状态下一个状态的预期操作值是基于“旧”目标网络计算的;选择max(1)[0]的最佳奖励。这是基于掩码合并的,这样当状态为最终状态时,我们将获得预期状态值或0。
  18. next_state_values = torch.zeros(BATCH_SIZE, device=device)
  19. next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
  20. # 计算期望 Q 值
  21. expected_state_action_values = (next_state_values * GAMMA) + reward_batch
  22. # 计算 Huber 损失
  23. loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
  24. # 优化模型
  25. optimizer.zero_grad()
  26. loss.backward()
  27. for param in policy_net.parameters():
  28. param.grad.data.clamp_(-1, 1)
  29. optimizer.step()

接下来,你可以找到主训练循环。开始时,我们重置环境并初始化state张量。然后,我们对一个操作进行采样,执行它,观察下一个屏幕和奖励(总是1),并对我们的模型进行一次优化。当 episode 结束(我们的模型失败)时,我们重新启动循环。

num_episodes设置得很小。你可以下载并运行更多的epsiodes,比如300+来进行有意义的持续时间改进。

  1. num_episodes = 50
  2. for i_episode in range(num_episodes):
  3. # 初始化环境和状态
  4. env.reset()
  5. last_screen = get_screen()
  6. current_screen = get_screen()
  7. state = current_screen - last_screen
  8. for t in count():
  9. # 选择并执行动作
  10. action = select_action(state)
  11. _, reward, done, _ = env.step(action.item())
  12. reward = torch.tensor([reward], device=device)
  13. # 观察新状态
  14. last_screen = current_screen
  15. current_screen = get_screen()
  16. if not done:
  17. next_state = current_screen - last_screen
  18. else:
  19. next_state = None
  20. # 在内存中储存当前参数
  21. memory.push(state, action, next_state, reward)
  22. # 进入下一状态
  23. state = next_state
  24. # 记性一步优化 (在目标网络)
  25. optimize_model()
  26. if done:
  27. episode_durations.append(t + 1)
  28. plot_durations()
  29. break
  30. #更新目标网络, 复制在 DQN 中的所有权重偏差
  31. if i_episode % TARGET_UPDATE == 0:
  32. target_net.load_state_dict(policy_net.state_dict())
  33. print('Complete')
  34. env.render()
  35. env.close()
  36. plt.ioff()
  37. plt.show()

下面是一个图表,它说明了整个结果数据流。

img/reinforcement_learning_diagram.jpg

动作可以是随机选择的,也可以是基于一个策略,从gym环境中获取下一步的样本。我们将结果记录在回放内存中,并在每次迭代中运行优化步骤。优化从重放内存中随机抽取一批来训练新策略。 “旧的”target_net也用于优化计算预期的Q值;它偶尔会更新以保持其最新。

脚本的总运行时间: (0分钟0.000秒)