IT评测·应用市场-qidao123.com
标题:
PyTorch 深度学习实战(15):Twin Delayed DDPG (TD3) 算法
[打印本页]
作者:
农民
时间:
2025-3-17 04:13
标题:
PyTorch 深度学习实战(15):Twin Delayed DDPG (TD3) 算法
在上一篇文章中,我们先容了 Deep Deterministic Policy Gradient (DDPG) 算法,并使用它办理了 Pendulum 题目。本文将深入探究
Twin Delayed DDPG (TD3)
算法,这是一种改进的 DDPG 算法,可以或许有用办理 DDPG 中的过估计题目。我们将使用 PyTorch 实现 TD3 算法,并应用于经典的 Pendulum 题目。
一、TD3 算法基础
TD3 是 DDPG 的改进版本,通过引入以下三个关键技术来办理 DDPG 中的过估计题目:
双重 Critic 网络
:
使用两个 Critic 网络来估计 Q 值,从而减少过估计题目。
延迟更新
:
延迟 Actor 网络的更新,确保 Critic 网络更稳定地收敛。
目的战略平滑
:
在目的动作中参加噪声,从而减少 Critic 网络的过拟合。
1. TD3 的核心思想
双重 Critic 网络
:
使用两个 Critic 网络来估计 Q 值,取两者中的较小值作为目的 Q 值,从而减少过估计。
延迟更新
:
每更新 Critic 网络多次,才更新一次 Actor 网络,确保 Critic 网络更稳定地收敛。
目的战略平滑
:
在目的动作中参加噪声,从而减少 Critic 网络的过拟合。
2. TD3 的上风
减少过估计
:
通过双重 Critic 网络和目的战略平滑,TD3 可以或许有用减少 Q 值的过估计。
练习稳定
:
延迟更新战略确保 Critic 网络更稳定地收敛。
适用于连续动作空间
:
TD3 可以或许直接输出连续动作,适用于机器人控制、自动驾驶等任务。
3. TD3 的算法流程
使用当前战略采样一批数据。
使用目的网络盘算目的 Q 值。
更新 Critic 网络以最小化 Q 值的误差。
延迟更新 Actor 网络以最大化 Q 值。
更新目的网络。
重复上述过程,直到战略收敛。
二、Pendulum 题目实战
我们将使用 PyTorch 实现 TD3 算法,并应用于 Pendulum 题目。目的是控制摆杆使其保持直立。
1. 题目描述
Pendulum 情况的状态空间包括摆杆的角度和角速率。动作空间是一个连续的扭矩值,范围在 −2,2 之间。智能体每保持摆杆直立一步,就会得到一个负的奖励,目的是最大化累积奖励。
2. 实现步骤
安装并导入须要的库。
界说 Actor 网络和 Critic 网络。
界说 TD3 练习过程。
测试模型并评估性能。
3. 代码实现
以下是完备的代码实现:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
# 改进的 Actor 网络(增加层归一化)
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.l1 = nn.Linear(state_dim, 256)
self.ln1 = nn.LayerNorm(256)
self.l2 = nn.Linear(256, 256)
self.ln2 = nn.LayerNorm(256)
self.l3 = nn.Linear(256, action_dim)
self.max_action = max_action
def forward(self, x):
x = F.relu(self.ln1(self.l1(x)))
x = F.relu(self.ln2(self.l2(x)))
x = torch.tanh(self.l3(x)) * self.max_action
return x
# 改进的 Critic 网络(增加层归一化)
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
self.l1 = nn.Linear(state_dim + action_dim, 256)
self.ln1 = nn.LayerNorm(256)
self.l2 = nn.Linear(256, 256)
self.ln2 = nn.LayerNorm(256)
self.l3 = nn.Linear(256, 1)
def forward(self, x, u):
x = F.relu(self.ln1(self.l1(torch.cat([x, u], 1))))
x = F.relu(self.ln2(self.l2(x)))
x = self.l3(x)
return x
class TD3:
def __init__(self, state_dim, action_dim, max_action):
self.actor = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=3e-4)
self.critic1 = Critic(state_dim, action_dim).to(device)
self.critic2 = Critic(state_dim, action_dim).to(device)
self.critic1_target = Critic(state_dim, action_dim).to(device)
self.critic2_target = Critic(state_dim, action_dim).to(device)
self.critic1_target.load_state_dict(self.critic1.state_dict())
self.critic2_target.load_state_dict(self.critic2.state_dict())
self.critic1_optimizer = optim.Adam(self.critic1.parameters(), lr=3e-4)
self.critic2_optimizer = optim.Adam(self.critic2.parameters(), lr=3e-4)
self.max_action = max_action
self.replay_buffer = deque(maxlen=1000000)
self.batch_size = 256
self.gamma = 0.99
self.tau = 0.005
self.policy_noise = 0.2
self.noise_clip = 0.5
self.policy_freq = 2
self.total_it = 0
self.exploration_noise = 0.1 # 新增探索噪声
def select_action(self, state, add_noise=True):
state = torch.FloatTensor(state.reshape(1, -1)).to(device)
action = self.actor(state).cpu().data.numpy().flatten()
if add_noise:
noise = np.random.normal(0, self.exploration_noise, size=action_dim)
action = (action + noise).clip(-self.max_action, self.max_action)
return action
def train(self):
if len(self.replay_buffer) < self.batch_size:
return
self.total_it += 1
batch = random.sample(self.replay_buffer, self.batch_size)
state = torch.FloatTensor(np.array([t[0] for t in batch])).to(device)
action = torch.FloatTensor(np.array([t[1] for t in batch])).to(device)
reward = torch.FloatTensor(np.array([t[2] for t in batch])).reshape(-1, 1).to(device) / 10.0 # 奖励缩放
next_state = torch.FloatTensor(np.array([t[3] for t in batch])).to(device)
done = torch.FloatTensor(np.array([t[4] for t in batch])).reshape(-1, 1).to(device)
with torch.no_grad():
noise = (torch.randn_like(action) * self.policy_noise).clamp(-self.noise_clip, self.noise_clip)
next_action = (self.actor_target(next_state) + noise).clamp(-self.max_action, self.max_action)
target_Q1 = self.critic1_target(next_state, next_action)
target_Q2 = self.critic2_target(next_state, next_action)
target_Q = torch.min(target_Q1, target_Q2)
target_Q = reward + (1 - done) * self.gamma * target_Q
current_Q1 = self.critic1(state, action)
current_Q2 = self.critic2(state, action)
critic1_loss = F.mse_loss(current_Q1, target_Q)
critic2_loss = F.mse_loss(current_Q2, target_Q)
self.critic1_optimizer.zero_grad()
critic1_loss.backward()
torch.nn.utils.clip_grad_norm_(self.critic1.parameters(), 1.0) # 梯度裁剪
self.critic1_optimizer.step()
self.critic2_optimizer.zero_grad()
critic2_loss.backward()
torch.nn.utils.clip_grad_norm_(self.critic2.parameters(), 1.0)
self.critic2_optimizer.step()
if self.total_it % self.policy_freq == 0:
actor_loss = -self.critic1(state, self.actor(state)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 1.0)
self.actor_optimizer.step()
for param, target_param in zip(self.critic1.parameters(), self.critic1_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for param, target_param in zip(self.critic2.parameters(), self.critic2_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
def save(self, filename):
torch.save(self.actor.state_dict(), filename + "_actor.pth")
def train_td3(env, agent, episodes=2000, early_stop_threshold=-150):
rewards_history = []
moving_avg = []
best_avg = -np.inf
for ep in range(episodes):
state,_ = env.reset()
episode_reward = 0
done = False
step = 0
while not done:
# 线性衰减探索噪声
if ep < 300:
agent.exploration_noise = max(0.5 * (1 - ep / 300), 0.1)
else:
agent.exploration_noise = 0.1
action = agent.select_action(state, add_noise=(ep < 100)) # 前100轮强制探索
next_state, reward, done, _, _ = env.step(action)
agent.replay_buffer.append((state, action, reward, next_state, done))
state = next_state
episode_reward += reward
agent.train()
step += 1
rewards_history.append(episode_reward)
current_avg = np.mean(rewards_history[-50:])
moving_avg.append(current_avg)
if current_avg > best_avg:
best_avg = current_avg
agent.save("td3_pendulum_best")
if (ep + 1) % 50 == 0:
print(f"Episode: {ep + 1}, Avg Reward: {current_avg:.2f}")
# 早停机制
if current_avg >= early_stop_threshold:
print(f"早停触发,平均奖励达到 {current_avg:.2f}")
break
return moving_avg, rewards_history
# 训练并可视化
td3_agent = TD3(state_dim, action_dim, max_action)
moving_avg, rewards_history = train_td3(env, td3_agent, episodes=2000)
# 可视化结果
plt.figure(figsize=(12, 6))
plt.plot(rewards_history, alpha=0.6, label='single round reward')
plt.plot(moving_avg, 'r-', linewidth=2, label='moving average (50 rounds)')
plt.xlabel('episodes')
plt.ylabel('reward')
plt.title('TD3 training performance on Pendulum-v1')
plt.legend()
plt.grid(True)
plt.show()
复制代码
三、代码剖析
Actor 和 Critic 网络
:
Actor 网络输出连续动作,通过 tanh 函数将动作限定在 −max_action,max_action 范围内。
Critic 网络输出状态-动作对的 Q 值。
TD3 练习过程
:
使用当前战略采样一批数据。
使用目的网络盘算目的 Q 值。
更新 Critic 网络以最小化 Q 值的误差。
延迟更新 Actor 网络以最大化 Q 值。
更新目的网络。
练习过程
:
在练习过程中,每 50 个 episode 打印一次平均奖励。
练习竣事后,绘制练习过程中的总奖励曲线。
四、运行结果
运行上述代码后,你将看到以下输出:
练习过程中每 50 个 episode 打印一次平均奖励。
练习竣事后,绘制练习过程中的总奖励曲线。
五、总结
本文先容了 TD3 算法的基本原理,并使用 PyTorch 实现了一个简单的 TD3 模型来办理 Pendulum 题目。通过这个例子,我们学习了如何使用 TD3 算法进行连续动作空间的战略优化。
在下一篇文章中,我们将探究更高级的强化学习算法,如 Soft Actor-Critic (SAC)。敬请期待!
代码实例说明
:
本文代码可以直接在 Jupyter Notebook 或 Python 脚本中运行。
假如你有 GPU,代码会自动检测并使用 GPU 加速。
希望这篇文章能帮助你更好地明白 TD3 算法!假如有任何题目,接待在批评区留言讨论。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/)
Powered by Discuz! X3.4