手记

详解 DDPG:解决连续动作控制难题的算法

了解DDPG如何解决连续动作控制的问题,开启AI驱动的医疗机器人领域中的潜力。

想象你正在控制一个外科手术中的机器人手臂。可能涉及的离散动作有:……

  • 上移,
  • 下移,
  • 抓,
  • 松开,

这些指令简单明了,执行起来很容易,尤其是在简单的场景里。

但是,如果是完成精细的动作,比如:

  • 将手臂移动0.5毫米以避免伤害组织
  • 施加3N的力量来压缩组织
  • 将手腕旋转15°来调整切口角度

在这些情况下,仅仅选择一个动作是不够的,你需要决定需要多少那样的动作。这就是所谓的连续动作空间的世界,而这正是深度确定性策略梯度(DDPG)真正发挥作用的地方!

传统的深度Q网络(DQN)在处理离散动作时效果不错,但在处理连续动作时就显得吃力了。相比之下,确定性策略梯度(DPG)解决了这个问题,但仍存在探索不足及不稳定的问题。DDPG 首次在T P. Lillicrap等人的论文中被提出,结合了DPGDQN的优点,提升了在连续动作空间环境中稳定性和表现。

在这篇文章里,我们将讨论DDPG的理论基础和架构,展示其在Python中的实现过程,通过在MountainCarContinuous游戏中测试其表现来评估其性能,并简要讨论DDPG如何在生物工程领域中被应用。

DDPG框架

DQN不同,DQN会评估每个可能的状态-动作对来找到最佳动作,DPG使用Actor-Critic架构。Actor学习直接将状态映射为动作的策略,避免了穷尽搜索,专注于为每个状态找到最佳动作。

不过,DPG遇到两个主要的难题:

  1. 这是一个确定性的算法,它会限制对动作空间的探索
  2. 由于学习过程中的不稳定性问题,它无法有效地利用神经网络。

DDPG 通过引入奥恩斯坦-乌伦贝克过程产生的 探索噪声 来改进 DPG,并使用 批归一化 和 DQN 的一些技术(例如 回放缓冲区目标网络)来稳定训练。

通过这些改进,DDPG非常适合于训练在连续的动作空间中的代理,例如在生物工程应用中的机器人控制。

接下来,让我们来研究DDPG模型的主要内容吧!

Actor-Critic框架

  • 策略网络(Actor):告诉代理在当前状态下应该采取什么行动。参数θμ表示权重。

小贴士! 将行动者网络(Actor Network)视为做决定的人:它将当前状态对应到一个动作。

  • 评判者(Q值网络):通过估算该状态下行动的Q值来评估行动的价值。

提示!可以将Critic Network视为评判网络,它为每个动作给出一个质量评分,并帮助行为者改进策略,以确保它确实能找到在每个状态下最佳的动作。

注意!评论家会使用估计的Q值,来完成两件任务:

改进演员政策(政策更新)

演员的目标是调整其参数θμ,以便输出使得评估者的Q值最大化的行动。

为了做到这一点,Actor 需要明白所选动作 a 对 Critic 的 Q 值的影响,以及其内部参数如何影响其策略,这通过策略梯度方程实现。它代表了从 minibatch 中计算的所有梯度的平均值。

2. 通过最小化如下的损失函数来优化其自身的网络(Q值网络的更新)。

其中 N 表示小批量中采样的经验数量,$y_i$ 是如下计算出的目标 Q 值。

回放缓冲

随着智能体探索环境,过去的经历(状态、行动、奖励和下一个状态)会被存储为元组(s, a, r, s′)在回放缓冲中。训练时,会随机抽取这些经历的一小批样本来训练智能体。

问题! 回放缓冲区是如何真正减少不稳定性呢?

通过随机抽取经验,回放缓冲区减少了连续样本的相关性,减少了偏差性,从而使得训练更加稳定。

目标网络系统

目标网络是Actor和Critic的缓慢地更新副本。它们提供了稳定的Q值参考,防止快速波动,并确保平稳和一致的更新。

问题 目标网络是如何实际减小不稳定性的?

没有Critic目标网络时,目标Q值直接从Critic Q值网络计算得出,后者持续更新。这导致每次目标Q值都会变化,产生一个“移动目标”问题。结果,Critic不断追赶一个不断变化的目标,导致训练不稳定。

另外,由于Actor(行为者)依赖于Critic(评判者)的反馈,一个网络中的错误会放大另一个网络中的错误,从而形成一个相互依存的不稳定性反馈循环。

通过引入目标网络并使用软更新规则逐步更新它们,我们使目标Q值保持更为稳定,减少突然变化,从而提高学习稳定性。

批量标准化

批量归一化神经网络每一层的输入,使其均值接近零,方差接近一。

问题:批归一化到底是如何减少模型的不稳定性?

从回放缓冲区抽样的样本可能与实时数据的分布不一致,这可能导致网络更新过程中的不稳定。

批量归一化确保输入数据分布的一致性缩放,防止由于输入数据分布的变化而导致的波动更新。

探索噪音

因为Actor的策略是固定的,所以在训练过程中向动作添加探索噪声,以鼓励代理尽可能探索动作范围。

在DDPG的论文中,作者使用了奥恩斯坦-乌伦贝克(Ornstein-Uhlenbeck)过程来生成与时间相关的噪声,以模仿现实世界的系统动态。

DDPG 伪代码:逐步解析

伪代码摘自http://arxiv.org/abs/1509.02971,(见文中参考文献1)

作者绘制的图

  • 让我们定义行动者和评判者网络
     class Actor(nn.Module):  
        """  
        DDPG算法中的Actor网络。  
        """  
        def __init__(self, state_dim, action_dim, max_action, use_batch_norm):  
            """  
            初始化Actor策略网络。

            :param state_dim: 状态空间的维度
            :param action_dim: 动作空间的维度
            :param max_action: 最大动作
            :param use_batch_norm: 是否使用批归一化
            """  
            super(Actor, self).__init__()  
            self.bn1 = nn.LayerNorm(HIDDEN_LAYERS_ACTOR) if use_batch_norm else nn.Identity()  
            self.bn2 = nn.LayerNorm(HIDDEN_LAYERS_ACTOR) if use_batch_norm else nn.Identity()  

            self.l1 = nn.Linear(state_dim, HIDDEN_LAYERS_ACTOR)  
            self.l2 = nn.Linear(HIDDEN_LAYERS_ACTOR, HIDDEN_LAYERS_ACTOR)  
            self.l3 = nn.Linear(HIDDEN_LAYERS_ACTOR, action_dim)  
            self.max_action = max_action  

        def forward(self, state):  
            """  
            前向传播。

            :param state: 输入状态
            :return: 动作
            """  
            a = torch.relu(self.bn1(self.l1(state)))  
            a = torch.relu(self.bn2(self.l2(a)))  
            return self.max_action * torch.tanh(self.l3(a))  

    class Critic(nn.Module):  
        """  
        DDPG算法中的Critic网络。  
        """  
        def __init__(self, state_dim, action_dim, use_batch_norm):  
            """  
            初始化Critic价值网络。

            :param state_dim: 状态空间的维度
            :param action_dim: 动作空间的维度
            :param use_batch_norm: 是否使用批归一化
            """  
            super(Critic, self).__init__()  
            self.bn1 = nn.BatchNorm1d(HIDDEN_LAYERS_CRITIC) if use_batch_norm else nn.Identity()  
            self.bn2 = nn.BatchNorm1d(HIDDEN_LAYERS_CRITIC) if use_batch_norm else nn.Identity()  
            self.l1 = nn.Linear(state_dim + action_dim, HIDDEN_LAYERS_CRITIC)  

            self.l2 = nn.Linear(HIDDEN_LAYERS_CRITIC, HIDDEN_LAYERS_CRITIC)  
            self.l3 = nn.Linear(HIDDEN_LAYERS_CRITIC, 1)  

        def forward(self, state, action):  
            """  
            前向传播。

            :param state: 输入状态
            :param action: 输入动作
            :return: 状态和动作对的Q值
            """  
            q = torch.relu(self.bn1(self.l1(torch.cat([state, action], 1))))  
            q = torch.relu(self.bn2(self.l2(q)))  
            return self.l3(q)
  • 定义回放缓

实现了 ReplayBuffer 类来存储和采样前一节中讨论的状态转换元组 (s, a, r, s’), 以实现小批量异策略学习。

    class ReplayBuffer:  
        """
        重放缓冲区类,用于存储和采样经验数据。
        """
        def __init__(self, capacity):  
            """
            初始化重放缓冲区,设置最大容量。
            """
            self.buffer = deque(maxlen=capacity)  

        def push(self, state, action, reward, next_state, done):  
            """
            将状态、动作、奖励、下一个状态和完成标志添加到缓冲区。
            """
            self.buffer.append((state, action, reward, next_state, done))  

        def sample(self, batch_size):  
            """
            从缓冲区中随机采样指定数量的数据。
            """
            return random.sample(self.buffer, batch_size)  

        def __len__(self):  
            """
            返回缓冲区的长度。
            """
            return len(self.buffer)
  • 定义OU噪声类

添加了 OUNoise 类来生成探索噪音,帮助智能体更有效地探索动作范围。

    """  
    摘自 https://github.com/vitchyr/rlkit/blob/master/rlkit/exploration_strategies/ou_strategy.py  
    """  
    class OUNoise:  
        def __init__(self, action_space, mu=0.0, theta=0.15, max_sigma=0.3, min_sigma=0.3, decay_period=100000):  
            self.mu           = mu  
            self.theta        = theta  
            self.sigma        = max_sigma  
            self.max_sigma    = max_sigma  
            self.min_sigma    = min_sigma  
            self.decay_period = decay_period  
            self.action_dim   = action_space.shape[0]  
            self.low          = action_space.low  
            self.high         = action_space.high  
            self.reset()  

        def reset(self):  
            # 重置噪声状态  
            self.state = np.ones(self.action_dim) * self.mu  

        def 更新状态(self):  
            x  = self.state  
            dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(self.action_dim)  
            self.state = x + dx  
            return self.state  

        def 获取动作(self, action, t=0):   
            # 获取动作,随着时间的推移衰减噪声  
            ou_state = self.更新状态()  
            self.sigma = self.max_sigma - (self.max_sigma - self.min_sigma) * min(1.0, t / self.decay_period)  
            return np.clip(action + ou_state, self.low, self.high)
  • 定义DDPG模型

定义了一个 DDPG 类,该类封装了代理的行为模式:

  1. 首先,让我们初始化一下,创建 Actor 和 Critic 网络,以及它们的目标网络(target网络)和重放缓存区。
    class DDPG():  
        """  
        深度确定性策略梯度(DDPG)智能体。  
        """  
        def __init__(self, state_dim, action_dim, max_action, use_batch_norm):  
            """  
            初始化DDPG。  

            :param state_dim: 状态空间的维数  
            :param action_dim: 动作空间的维数  
            :param max_action: 动作的最大幅度  
            """  
            # [步骤0]  
            # 初始化Actor策略网络  
            self.actor = Actor(state_dim, action_dim, max_action, use_batch_norm)  
            # 初始化Actor目标网络,权重与Actor的策略网络一致  
            self.actor_target = Actor(state_dim, action_dim, max_action, use_batch_norm)  
            self.actor_target.load_state_dict(self.actor.state_dict())  
            self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=ACTOR_LR)  

            # 初始化Critic价值网络  
            self.critic = Critic(state_dim, action_dim, use_batch_norm)  
            # 初始化Critic目标网络,权重与Critic的价值网络一致  
            self.critic_target = Critic(state_dim, action_dim, use_batch_norm)  
            self.critic_target.load_state_dict(self.critic.state_dict())  
            self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=CRITIC_LR)  

            # 初始化回放缓冲区  
            self.replay_buffer = ReplayBuffer(BUFFER_SIZE)
  1. 选择动作:select_action 方法根据当前策略来选择动作。
        def select_action(self, state):  
            """  
            根据当前的状态选择一个动作。  

            :param state: 当前状态  
            :return: 返回动作  
            """  
            state = torch.FloatTensor(state.reshape(1, -1))  
            action = self.actor(state).cpu().data.numpy().flatten()  
            return action
  • 3. train 方法说明了网络如何通过经验回放缓冲区来学习和更新。

注意! 由于论文中介绍了使用目标网络和批归一化来提高稳定性,我设计了 train 方法,使其可以让我们启用或禁用这些功能。这让我们可以在有或没有这些功能的情况下进行比较。请参阅下方代码以了解实现。

        def train(self, use_target_network, use_batch_norm):  
            """  
            训练DDPG智能体。  

            :param use_target_network: 是否使用目标网络  
            :param use_batch_norm: 是否使用批归一化  
            """  
            if len(self.replay_buffer) < BATCH_SIZE:  
                return  

            # [步骤4]. 从重放缓冲区中采样一批数据,  
            batch = self.replay_buffer.sample(BATCH_SIZE)  
            state, action, reward, next_state, done = map(np.stack, zip(*batch))  

            state = torch.FloatTensor(state)  
            action = torch.FloatTensor(action)  
            next_state = torch.FloatTensor(next_state)  
            reward = torch.FloatTensor(reward.reshape(-1, 1))  
            done = torch.FloatTensor(done.reshape(-1, 1))  

            # 批评网络更新 #,  
            if use_target_network:  
                target_Q = self.critic_target(next_state, self.actor_target(next_state))  
            else:  
                target_Q = self.critic(next_state, self.actor(next_state))  

            # [步骤5]. 计算目标Q值 (y_i),  
            target_Q = reward + (1 - done) * GAMMA * target_Q  
            current_Q = self.critic(state, action)  
            critic_loss = nn.MSELoss()(current_Q, target_Q.detach())  

            # [步骤6]. 使用梯度下降来更新批评网络的权重,以最小化损失函数,  
            self.critic_optimizer.zero_grad()  
            critic_loss.backward()  
            self.critic_optimizer.step()  

            # 行为网络更新 #,  
            actor_loss = -self.critic(state, self.actor(state)).mean()  

            # [步骤7]. 使用梯度下降来更新行为网络的权重,  
            # 以最小化损失函数并最大化Q值 => 选择可获得最高累积回报的动作,  
            self.actor_optimizer.zero_grad()  
            actor_loss.backward()  
            self.actor_optimizer.step()  

            # [步骤8]. 更新目标网络,  
            if use_target_network:  
                for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):  
                    target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)  

                for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):  
                    target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)
  • 训练DDPG智能体。

将所有定义的类和方法一起结合起来,我们可以训练DDPG。我的train_dppg函数按照伪代码和DDPG模型结构图。

提示: 为了便于你理解,给每个代码部分标上了对应的步骤号。希望这能帮到你!哈!

    def train_ddpg(use_target_network, use_batch_norm, num_episodes=NUM_EPISODES):  
        """  
        训练DDPG代理程序。  

        :param use_target_network: 是否使用目标网络  
        :param use_batch_norm: 是否使用批处理标准化  
        :param num_episodes: 要训练的集数  
        :return: 集数奖励的列表  
        """  
        agent = DDPG(state_dim, action_dim, 1,use_batch_norm)  

        episode_rewards = []  
        noise = OUNoise(env.action_space)  

        for episode in range(num_episodes):  
            state = env.reset()  
            noise.reset()  
            episode_reward = 0  
            done = False  
            step = 0  
            while not done:  
                action_actor = agent.select_action(state)  
                action = noise.get_action(action_actor,step)  # 添加噪声进行探索  
                next_state, reward, done,_ = env.step(action)  
                done = float(done) if isinstance(done, (bool, int)) else float(done[0])  # 如果done是bool或int类型,保持不变,否则转为浮点数  
                agent.replay_buffer.push(state, action, reward, next_state, done)  

                if len(agent.replay_buffer) > BATCH_SIZE:  
                    agent.train(use_target_network,use_batch_norm)  

                state = next_state  
                episode_reward += reward  
                step += 1  

            episode_rewards.append(episode_reward)  

            if (episode + 1) % 10 == 0:  
                print(f"第 {episode + 1} 集:奖励 = {episode_reward}")  

        return agent, episode_rewards
性能与成果:评估DDPG的有效性

DDPG在连续动作空间的有效性在MountainCarContinuous-v0环境中进行了测试,在该环境中,智能体学习如何通过获得动量来驾驶汽车爬上陡峭的山坡。结果表明,使用Target NetworksBatch Normalization可以实现更快的收敛速度,更高的回报,以及比其他配置更稳定的训练过程。

作者自己画的这张图

作者制作的 GIF

注意! 您可以自己动手在任何选择的环境中运行我在我的 GitHub 仓库上提供的代码,只需更改环境名称即可!

生物工程中的DDPG:追求精确与适应能力

通过这篇博客,我们了解到DDPG是一种在连续动作空间环境中训练代理的强大算法。通过结合DPG和DQN的技术手段,DDPG提高了探索、稳定性和性能——这些都是机器人手术和生物工程应用中的关键因素。

想象一个像 达芬奇系统 那样的机器人外科医生使用 DDPG 实时精细操作,确保没有误差的精确调整。机器人可以通过 DDPG 将手臂的位置调整到毫米级别,在缝合时施加精确力度,甚至进行微小手腕旋转以达到最佳切口效果。这种实时精准度能改变手术结果,缩短恢复时间,并减少人为错误。

但DDPG的潜力不仅限于外科手术。它已经在生物工程领域取得进展,使机器人假肢和辅助设备能够像人类四肢一样自然地运动(点击这里查看这篇非常有趣的文章!)。

现在我们已经介绍了 DDPG 的理论基础,是时候开始尝试实现它了。从简单的例子入手,慢慢尝试更复杂的场景。

参考资料
  1. Lillicrap TP, Hunt JJ, Pritzel A, Heess N, Erez T, Tassa Y, 等. 基于深度强化学习的连续控制 [互联网]. arXiv; 2019 年. 可访问: http://arxiv.org/abs/1509.02971
0人推荐
随时随地看视频
慕课网APP