• 教程 >
  • 强化学习 (DQN) 教程
Shortcuts

强化学习 (DQN) 教程|

作者Adam Paszke

本教程演示如何使用 PyTorch 在OpenAI 健身房的 CartPole-v0 任务上训练深度 Q 学习 (DQN) 代理。

任务

代理必须两个操作(向左或向右移动推车)之间决定,以便连接到它的杆保持直立。 你可以在健身房网站上找到一个官方排行榜,其中有各种算法和可视化效果。

cartpole

手推车|

当代理观察环境的当前状态并选择操作时,环境将转换为新状态,并返回指示操作结果的奖励。 在此任务中,每个增量时间步长的奖励为 +1,如果极点跌得太远或推车移动超过 2.4 个单位远离中心,则环境终止。 这意味着性能更好的方案将运行更长时间,从而累积更大的回报。

CartPole 任务的设计使代理的输入是表示环境状态(位置、速度等)的 4 个实际值。 但是,神经网络可以完全通过查看场景来解决问题,因此我们将使用以购物车为中心的屏幕补丁作为输入。 因此,我们的结果与官方排行榜的结果没有直接可比性——我们的任务要困难得多。 不幸的是,这确实减慢了训练速度,因为我们必须渲染所有帧。

严格地说,我们将状态作为当前屏幕修补程序和前一个修补程序之间的差异显示。 这将允许代理从一个图像中考虑极点的速度。

首先,让我们导入所需的包。 首先,我们需要健身房的环境(安装使用pip安装健身房)。 我们还将使用以下来自 PyTorch 的以下内容:

  • 神经网络 (torch.nn

  • 优化 (torch.optim

  • 自动分化 (torch.autograd

  • 用于视觉任务的实用程序 (torchvision-单独的包)。

import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T


env = gym.make('CartPole-v0').unwrapped

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

重放内存|

我们将使用体验重播内存来训练我们的 DQN。 它存储代理观察到的转换,允许我们以后重用此数据。 通过随机抽样,建立批处理的过渡是不相关的。 已经表明,这极大地稳定并改进了DQN训练程序。

为此,我们需要两个类:

  • Transition- 表示环境中单个转换的命名元组。 它实质上映射(状态,操作)对到他们的(next_state,奖励)结果,状态是屏幕差异图像,如后来所述。

  • ReplayMemory- 一种具有边界大小的循环缓冲区,用于保存最近观察到的过渡。 它还实现了一个.sample()方法,用于选择随机一批转换以进行训练。

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

现在,让我们定义我们的模型。 但首先,让我们快速回顾一下DQN是什么。

DQN 算法|

我们的环境是确定性的,因此这里介绍的所有方程也是为简单起见而确定的。 在强化学习文献中,它们也会包含对环境中随机转换的期望。

我们的目标是培训一项政策,试图最大化贴现的累积回报[(R_[t_0] =sum_{t_0}\gamma_t - t_0] r_t)),其中[(R_]t_0])也称为回报 折扣([(gamma])应该是一个常数,介于+(0})+(1})之间,以确保总和收敛。 它使不确定的未来对我们的经纪人来说不那么重要,在不远的将来,它是相当自信的。

Q-learning 背后的主要思想是,如果我们具有一个函数[(Q]: 状态 [时间操作]右箭头 [mathbb]R]),它可以告诉我们我们的回报是什么,如果我们在给定状态下执行一个操作,那么我们可以轻松地构建一个策略来最大化我们的回报:

\[\pi^*(s) = \arg\!\max_a \ Q^*(s, a)\]

但是,我们并不了解世界的一切,因此我们无法访问[(Q])。 但是,由于神经网络是通用函数近似器,我们可以简单地创建一个并训练它类似于+(Q_)。

对于我们的培训更新规则,我们将使用一个事实,即某些策略的每个\(Q])函数都遵循 Bellman 等式:

\[Q^{\pi}(s, a) = r + \gamma Q^{\pi}(s', \pi(s'))\]

相等的两侧之间的差异称为时差误差,[(\delta]):

\[\delta = Q(s, a) - (r + \gamma \max_a Q(s', a))\]

为了尽量减少这个错误,我们将使用Huber损失 当误差很小时,Huber 损耗类似于均方误差,但类似于误差较大的均值绝对误差 - 当[(Q])的估计值非常嘈杂时,对异常值的平均值更加可靠。 我们从重播记忆中采样的一批转换中计算此数据:* (B+)

\[\mathcal{L} = \frac{1}{|B|}\sum_{(s, a, s', r) \ \in \ B} \mathcal{L}(\delta)\]
\[\begin{split}\text{where} \quad \mathcal{L}(\delta) = \begin{cases} \frac{1}{2}{\delta^2} & \text{for } |\delta| \le 1, \\ |\delta| - \frac{1}{2} & \text{otherwise.} \end{cases}\end{split}\]

Q-网络|

我们的模型将是一个卷积神经网络,它考虑到当前和以前的屏幕补丁之间的差异。 它有两个输出,表示\(q,[mathrm]左)))[(q,[mathrm]右))(其中[(s]是网络的输入)。 实际上,网络正在尝试预测给定当前输入执行每个操作的预期回报

class DQN(nn.Module):

    def __init__(self, h, w, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)

        # Number of Linear input connections depends on output of conv2d layers
        # and therefore the input image size, so compute it.
        def conv2d_size_out(size, kernel_size = 5, stride = 2):
            return (size - (kernel_size - 1) - 1) // stride  + 1
        convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
        convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
        linear_input_size = convw * convh * 32
        self.head = nn.Linear(linear_input_size, outputs)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))

输入提取|

下面的代码是用于从环境中提取和处理呈现的图像的实用程序。 它使用torchvision包,这使得它很容易组成图像变换。 运行单元格后,它将显示提取的示例修补程序。

resize = T.Compose([T.ToPILImage(),
                    T.Resize(40, interpolation=Image.CUBIC),
                    T.ToTensor()])


def get_cart_location(screen_width):
    world_width = env.x_threshold * 2
    scale = screen_width / world_width
    return int(env.state[0] * scale + screen_width / 2.0)  # MIDDLE OF CART

def get_screen():
    # Returned screen requested by gym is 400x600x3, but is sometimes larger
    # such as 800x1200x3. Transpose it into torch order (CHW).
    screen = env.render(mode='rgb_array').transpose((2, 0, 1))
    # Cart is in the lower half, so strip off the top and bottom of the screen
    _, screen_height, screen_width = screen.shape
    screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)]
    view_width = int(screen_width * 0.6)
    cart_location = get_cart_location(screen_width)
    if cart_location < view_width // 2:
        slice_range = slice(view_width)
    elif cart_location > (screen_width - view_width // 2):
        slice_range = slice(-view_width, None)
    else:
        slice_range = slice(cart_location - view_width // 2,
                            cart_location + view_width // 2)
    # Strip off the edges, so that we have a square image centered on a cart
    screen = screen[:, :, slice_range]
    # Convert to float, rescale, convert to torch tensor
    # (this doesn't require a copy)
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Resize, and add a batch dimension (BCHW)
    return resize(screen).unsqueeze(0).to(device)


env.reset()
plt.figure()
plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(),
           interpolation='none')
plt.title('Example extracted screen')
plt.show()

训练

超参数和实用程序|

此单元格实例化我们的模型及其优化器,并定义一些实用程序:

  • select_action - 将相应地选择一个操作来执行 epsilon 贪婪策略。 简单地说,我们有时会使用模型来选择操作,有时我们只需均匀地采样一个。 选择随机操作的概率将从EPS_START开始,并呈指数EPS_END,EPS_END。 EPS_DECAY控制衰变的速度。

  • plot_durations - 绘制剧集持续时间的帮手,以及过去 100 集的平均值(官方评估中使用的度量)。 绘图将位于包含主训练循环的单元格下方,并在每集后更新。

BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10

# Get screen size so that we can initialize layers correctly based on shape
# returned from AI gym. Typical dimensions at this point are close to 3x40x90
# which is the result of a clamped and down-scaled render buffer in get_screen()
init_screen = get_screen()
_, _, screen_height, screen_width = init_screen.shape

# Get number of actions from gym action space
n_actions = env.action_space.n

policy_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations():
    plt.figure(2)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        display.clear_output(wait=True)
        display.display(plt.gcf())

训练循环

最后,训练模型的代码。

在这里,您可以找到执行优化单个步骤optimize_model函数。 它首先对一批进行采样,将所有张子串联成一个,计算[(Q(s_t、a_t))和 \(v(s_{t{1}) = [max_a Q(s_{1},a))),并将它们合并到我们的损失中。 如果[(s])是终端状态,则通过除颤设置[(V)= 0])。 我们还使用目标网络计算+(v(s_{t}1}))以增加稳定性。 目标网络的权重大部分时间都保持冻结,但会经常更新策略网络的权重。 这通常是一组步骤,但我们将使用单集为简单。

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.uint8)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

下面,您可以找到主训练循环。 在开始时,我们重置环境并初始化state张力。 然后,我们采样一个操作,执行它,观察下一个屏幕和奖励(总是 1),并优化我们的模型一次。 当情节结束时(我们的模型失败),我们重新启动循环。

下面,num_episodes设置较小。 您应该下载笔记本并运行更多 epsiodes,例如 300+,以便进行有意义的持续时间改进。

num_episodes = 50
for i_episode in range(num_episodes):
    # Initialize the environment and state
    env.reset()
    last_screen = get_screen()
    current_screen = get_screen()
    state = current_screen - last_screen
    for t in count():
        # Select and perform an action
        action = select_action(state)
        _, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)

        # Observe new state
        last_screen = current_screen
        current_screen = get_screen()
        if not done:
            next_state = current_screen - last_screen
        else:
            next_state = None

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the target network)
        optimize_model()
        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break
    # Update the target network, copying all weights and biases in DQN
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')
env.render()
env.close()
plt.ioff()
plt.show()

下图说明了总体结果数据流。

../_images/reinforcement_learning_diagram.jpg

操作是随机选择的,也可以基于策略选择,从健身环境中获取下一步示例。 我们在重播内存中记录结果,并在每次迭代时运行优化步骤。 优化从重播内存中随机选取一批以执行新策略的训练。 "旧"target_net还用于优化以计算预期的 Q 值;它偶尔更新,以保持其最新。

脚本总运行时间: ( 0 分钟 0.000 秒)

由狮身人面像库生成的画廊