【强化学习环境配置+github 无人机强化学习demo复现】
https://i-blog.csdnimg.cn/direct/6fb9c25433004bb2a8176bb9837e7e05.png1. N卡驱动安装
起首用体系管家查看电脑硬件信息,本次采用RTX 2080Ti 显卡配置环境,体系接入N卡官网地址:https://www.nvidia.cn/drivers/lookup/, 手动填写体系配置点击查找
https://i-blog.csdnimg.cn/direct/1ef09e9dc18d49c3bc1b459c4f78717e.png
如果不玩游戏的朋侪,推荐选择NVIDIA Studio驱动程序,点击查看按钮
https://i-blog.csdnimg.cn/direct/d5bfb2d273ea42a2902cfa0be79ed4f9.png
这里点击下载最新的驱动程序
https://i-blog.csdnimg.cn/direct/14000ef8f9894b4fb2d3ae75738db278.png
双击运行,选择安装位置
https://i-blog.csdnimg.cn/direct/1ebac714ec4343ed945fe580de555dec.png
耐心等待安装过程
https://i-blog.csdnimg.cn/direct/d3e38d91a0854186babaa2eead27eb6b.png
https://i-blog.csdnimg.cn/direct/b96a095e57724eac9d38b4fbb92e562b.png
https://i-blog.csdnimg.cn/direct/b148fdf43762449cbfb93fc63b7e4d35.png
https://i-blog.csdnimg.cn/direct/eba29ff2e6054ec5bc6f4ecf18935b37.png
安装(更新)好了显卡驱动以后查看对应版本。我们按下win+R组合键,打开cmd命令窗口。输入如下的命令。
nvidia-smi
得到如下图的信息图,可以看到驱动的版本是565.90;最高支持的CUDA版本是12.7版本。得到显卡的最高支持的CUDA版本,我们就可以根据这个信息来安装环境了。
https://i-blog.csdnimg.cn/direct/0c5c02ae186042398dc22b57121075a3.png
2. Anaconda 安装
打开网址:https://www.anaconda.com/download/success,现在是2024年11月,对应的anaconda版本是支持python3.12。如果想下载之前的版本,或者更低python版本的anaconda。大家可以根据本身空间大小选择anaconda或者miniconda
https://i-blog.csdnimg.cn/direct/986f41f3273b42b185b2dfc063ccd16f.png
安装conda
以管理员身份运行软件,点击next
https://i-blog.csdnimg.cn/direct/5131ac369d104f68b81b461dc4dcb8d7.png
https://i-blog.csdnimg.cn/direct/a6570369b9bc496f92abc33be5550492.png
https://i-blog.csdnimg.cn/direct/1840f5b9aefe4d97b028dc7382166c01.png
修改位置
https://i-blog.csdnimg.cn/direct/5151749e0c6c4250bb85c364cb6dacde.png
https://i-blog.csdnimg.cn/direct/ef59c51c71dd4028a577ec51c353be7f.png
https://i-blog.csdnimg.cn/direct/a1437c10fff14149837d418603562d79.png
上面就是安装完成,可以检查环境变量,添加一下,方便后期编译器识别
注:根据Anaconda安装的位置修改(E:\Anaconda)部门
E:\Anaconda(Python需要)
E:\Anaconda\Scripts(conda自带脚本)
E:\Anaconda\Library\mingw-w64\bin(使用C with python的时候)
E:\Anaconda\Library\usr\bin
E:\Anaconda\Library\bin(jupyter notebook动态库)
https://i-blog.csdnimg.cn/direct/d6a8e105ff024bd18921f2615b541815.png
3. Pytorch环境安装
按下开始键(win键),点击如图中的图标。打开anaconda的终端Prompt。
https://i-blog.csdnimg.cn/direct/d9dbc9a69e2d4ef8a985f8fb0a0736cf.png
实验如下的指令查看有哪些环境
conda env list
可以看出来,新安装的anaconda只有一个base环境。
https://i-blog.csdnimg.cn/direct/ab0c59f47b9541b4851b638b0a9bfae0.png
https://i-blog.csdnimg.cn/direct/fe9caf60fc8e4696a8f41b75a9f05be7.png
修改环境位置
https://i-blog.csdnimg.cn/direct/2f7d5fe5d5204fe8ac28b7f328717479.png
安装的文件夹也需要设置权限
https://i-blog.csdnimg.cn/direct/9f97d5a0b0c94f6994232f1494dbfc7e.png
conda info
conda create -n rltorch python=3.10
https://i-blog.csdnimg.cn/direct/12732f23f6834b4086cfe969852a3191.png
当安装好了以后,实验conda env list
这个命令,就可以看到比一开始多了一个pytorch这个环境。现在我们可以在这个环境里面安装深度学习框架和一些Python包了。
实验如下命令,激活这个环境。conda activate 假造环境名称
conda activate rltorch
https://i-blog.csdnimg.cn/direct/378d48ad66c249609f7a2d78c93b7277.png
安装pytorch-gup版的环境,由于(https://pytorch.org/)在国外,下载相关的环境包是比较慢的,所以我们给环境换源。在pytorch环境下执行如下的命名给环境换清华源。
https://i-blog.csdnimg.cn/direct/6983679a43d04ac588c07132cbc929d9.png
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/
conda config --set show_channel_urls yes
然后打开pytorch的官网,由于开头我们通过驱动检测到我的显卡为 RTX2080Ti,最高支持cuda11.6版本,以是我们选择cuda11.3版本的cuda,然后将下面红色框框中的内容复制下来,肯定不要把后面的-c pytorch也复制下来,因为如许运行就是照旧在国外源下载,如许就会很慢。
# CUDA 11.8
conda install pytorch==2.3.0 torchvision==0.18.0 torchaudio==2.3.0 pytorch-cuda=11.8 -c pytorch -c nvidia
# CUDA 12.1
conda install pytorch==2.3.0 torchvision==0.18.0 torchaudio==2.3.0 pytorch-cuda=12.1 -c pytorch -c nvidia
# CPU Only
conda install pytorch==2.3.0 torchvision==0.18.0 torchaudio==2.3.0 cpuonly -c pytorch
安装过程需要耐心等待
https://i-blog.csdnimg.cn/direct/4ab0bf45556b450ba2cd5a33fb613de2.png
pip list
https://i-blog.csdnimg.cn/direct/6e01063e5f044b90842a8880d302ea97.png
安装 gym 模块: 你可以使用 pip 来安装 gym 模块。pygame模块一般用来交互显示训练效果,打开终端或命令提示符,然后运行以下命令:
pip install gym
pip install pygame
4. Vscode安装
我们编写代码采用Vscode,Vscode和Git软件配置教程参考:https://vor2345.blog.csdn.net/article/details/142727918
5. Github demo复现
我们来复现西工大的一篇开源论文《基于MASAC强化学习算法的多无人机协同路径规划》文章DOI: https://doi.org/10.1360/SSI-2024-0050
github代码开源地址:https://github.com/henbudidiao/UAV-path-planning
打开Vscode gitbash
git clone "https://github.com/henbudidiao/UAV-path-planning.git"
我们需要打开motion plan项目文件夹
https://i-blog.csdnimg.cn/direct/a90db93052254d69a540e9407b169fe5.png
修改四处代码
5.1. main_SAC.py
是作者论文重要计划的强化学习方法:
# -*- coding: utf-8 -*-
#开发者:Bright Fang
#开发时间:2023/7/30 18:13
from rl_env.path_env import RlGame
# import pygame
# from assignment import constants as C
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from matplotlib import pyplot as plt
import os
import pickle as pkl
filedir = os.path.dirname(__file__)
shoplistfile = filedir + "\\MASAC_new1"#保存文件数据所在文件的文件名
shoplistfile_test = filedir + "\\MASAC_d_test2"#保存文件数据所在文件的文件名
shoplistfile_test1 = filedir + "\\MASAC_compare"#保存文件数据所在文件的文件名
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
N_Agent=1
M_Enemy=4
RENDER=True
TRAIN_NUM = 1
TEST_EPIOSDE=100
env = RlGame(n=N_Agent,m=M_Enemy,render=RENDER).unwrapped
state_number=7
action_number=env.action_space.shape
max_action = env.action_space.high
min_action = env.action_space.low
EP_MAX = 500
EP_LEN = 1000
GAMMA = 0.9
q_lr = 3e-4
value_lr = 3e-3
policy_lr = 1e-3
BATCH = 128
tau = 1e-2
MemoryCapacity=20000
Switch=1
class Ornstein_Uhlenbeck_Noise:
def __init__(self, mu, sigma=0.1, theta=0.1, dt=1e-2, x0=None):
self.theta = theta
self.mu = mu
self.sigma = sigma
self.dt = dt
self.x0 = x0
self.reset()
def __call__(self):
x = self.x_prev + \
self.theta * (self.mu - self.x_prev) * self.dt + \
self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
'''
后两行是dXt,其中后两行的前一行是θ(μ-Xt)dt,后一行是σεsqrt(dt)
'''
self.x_prev = x
return x
def reset(self):
if self.x0 is not None:
self.x_prev = self.x0
else:
self.x_prev = np.zeros_like(self.mu)
class ActorNet(nn.Module):
def __init__(self,inp,outp):
super(ActorNet, self).__init__()
self.in_to_y1=nn.Linear(inp,256)
self.in_to_y1.weight.data.normal_(0,0.1)
self.y1_to_y2=nn.Linear(256,256)
self.y1_to_y2.weight.data.normal_(0,0.1)
self.out=nn.Linear(256,outp)
self.out.weight.data.normal_(0,0.1)
self.std_out = nn.Linear(256, outp)
self.std_out.weight.data.normal_(0, 0.1)
def forward(self,inputstate):
inputstate=self.in_to_y1(inputstate)
inputstate=F.relu(inputstate)
inputstate=self.y1_to_y2(inputstate)
inputstate=F.relu(inputstate)
mean=max_action*torch.tanh(self.out(inputstate))#输出概率分布的均值mean
log_std=self.std_out(inputstate)#softplus激活函数的值域>0
log_std=torch.clamp(log_std,-20,2)
std=log_std.exp()
return mean,std
class CriticNet(nn.Module):
def __init__(self,input,output):
super(CriticNet, self).__init__()
#q1
self.in_to_y1=nn.Linear(input+output,256)
self.in_to_y1.weight.data.normal_(0,0.1)
self.y1_to_y2=nn.Linear(256,256)
self.y1_to_y2.weight.data.normal_(0,0.1)
self.out=nn.Linear(256,1)
self.out.weight.data.normal_(0,0.1)
#q2
self.q2_in_to_y1 = nn.Linear(input+output, 256)
self.q2_in_to_y1.weight.data.normal_(0, 0.1)
self.q2_y1_to_y2 = nn.Linear(256, 256)
self.q2_y1_to_y2.weight.data.normal_(0, 0.1)
self.q2_out = nn.Linear(256, 1)
self.q2_out.weight.data.normal_(0, 0.1)
def forward(self,s,a):
inputstate = torch.cat((s, a), dim=1)
#q1
q1=self.in_to_y1(inputstate)
q1=F.relu(q1)
q1=self.y1_to_y2(q1)
q1=F.relu(q1)
q1=self.out(q1)
#q2
q2 = self.q2_in_to_y1(inputstate)
q2 = F.relu(q2)
q2 = self.q2_y1_to_y2(q2)
q2 = F.relu(q2)
q2 = self.q2_out(q2)
return q1,q2
class Memory():
def __init__(self,capacity,dims):
self.capacity=capacity
self.mem=np.zeros((capacity,dims))
self.memory_counter=0
'''存储记忆'''
def store_transition(self,s,a,r,s_):
tran = np.hstack((s, a,r, s_))# 把s,a,r,s_困在一起,水平拼接
index = self.memory_counter % self.capacity#除余得索引
self.mem = tran# 给索引存值,第index行所有列都为其中一次的s,a,r,s_;mem会是一个capacity行,(s+a+r+s_)列的数组
self.memory_counter+=1
'''随机从记忆库里抽取'''
def sample(self,n):
assert self.memory_counter>=self.capacity,'记忆库没有存满记忆'
sample_index = np.random.choice(self.capacity, n)#从capacity个记忆里随机抽取n个为一批,可得到抽样后的索引号
new_mem = self.mem#由抽样得到的索引号在所有的capacity个记忆中得到记忆s,a,r,s_
return new_mem
class Actor():
def __init__(self):
self.action_net=ActorNet(state_number,action_number)#这只是均值mean
self.optimizer=torch.optim.Adam(self.action_net.parameters(),lr=policy_lr)
def choose_action(self,s):
inputstate = torch.FloatTensor(s)
mean,std=self.action_net(inputstate)
dist = torch.distributions.Normal(mean, std)
action=dist.sample()
action=torch.clamp(action,min_action,max_action)
return action.detach().numpy()
def evaluate(self,s):
inputstate = torch.FloatTensor(s)
mean,std=self.action_net(inputstate)
dist = torch.distributions.Normal(mean, std)
noise = torch.distributions.Normal(0, 1)
z = noise.sample()
action=torch.tanh(mean+std*z)
action=torch.clamp(action,min_action,max_action)
action_logprob=dist.log_prob(mean+std*z)-torch.log(1-action.pow(2)+1e-6)
return action,action_logprob
def learn(self,actor_loss):
loss=actor_loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
class Entroy():
def __init__(self):
self.target_entropy = -0.1
self.log_alpha = torch.zeros(1, requires_grad=True)
self.alpha = self.log_alpha.exp()
self.optimizer = torch.optim.Adam(, lr=q_lr)
def learn(self,entroy_loss):
loss=entroy_loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
class Critic():
def __init__(self):
self.critic_v,self.target_critic_v=CriticNet(state_number*(N_Agent+M_Enemy),action_number),CriticNet(state_number*(N_Agent+M_Enemy),action_number)#改网络输入状态,生成一个Q值
self.target_critic_v.load_state_dict(self.critic_v.state_dict())
self.optimizer = torch.optim.Adam(self.critic_v.parameters(), lr=value_lr,eps=1e-5)
self.lossfunc = nn.MSELoss()
def soft_update(self):
for target_param, param in zip(self.target_critic_v.parameters(), self.critic_v.parameters()):
target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)
def get_v(self,s,a):
return self.critic_v(s,a)
def target_get_v(self,s,a):
return self.target_critic_v(s,a)
def learn(self,current_q1,current_q2,target_q):
loss = self.lossfunc(current_q1, target_q) + self.lossfunc(current_q2, target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def main():
run(env)
def run(env):
if Switch==0:
try:
assert M_Enemy == 1
except:
print('程序终止,被逮到~嘿嘿,哥们儿预判到你会犯错,这段程序中变量\'M_Enemy\'的值必须为1,请把它的值改为1。\n'
'改为1之后程序一定会报错,这是因为组数越界,更改path_env.py文件中的跟随者无人机初始化个数;删除多余的\n'
'求距离函数,即变量dis_1_agent_0_to_3等,以及提到变量dis_1_agent_0_to_3等的地方;删除画无人机轨迹的\n'
'函数;删除step函数的最后一个返回值dis_1_agent_0_to_1;将player.py文件中的变量dt改为1;即可开始训练!\n'
'如果实在不会改也无妨,我会在不久之后出一个视频来手把手教大伙怎么改,可持续关注此项目github中的README文件。\n')
else:
print('SAC训练中...')
all_ep_r = [[] for i in range(TRAIN_NUM)]
all_ep_r0 = [[] for i in range(TRAIN_NUM)]
all_ep_r1 = [[] for i in range(TRAIN_NUM)]
for k in range(TRAIN_NUM):
actors =
critics =
entroys =
for i in range(N_Agent+M_Enemy):
actors = Actor()
critics = Critic()
entroys = Entroy()
M = Memory(MemoryCapacity, 2 * state_number*(N_Agent+M_Enemy) + action_number*(N_Agent+M_Enemy) + 1*(N_Agent+M_Enemy))
ou_noise = Ornstein_Uhlenbeck_Noise(mu=np.zeros(((N_Agent+M_Enemy), action_number)))
action=np.zeros(((N_Agent+M_Enemy), action_number))
# aaa = np.zeros((N_Agent, state_number))
for episode in range(EP_MAX):
observation = env.reset()# 环境重置
reward_totle,reward_totle0,reward_totle1 = 0,0,0
for timestep in range(EP_LEN):
for i in range(N_Agent+M_Enemy):
action = actors.choose_action(observation)
# action=actor0.choose_action(observation)
# action = actor0.choose_action(observation)
if episode <= 20:
noise = ou_noise()
else:
noise = 0
action = action + noise
action = np.clip(action, -max_action, max_action)
observation_, reward,done,win,team_counter= env.step(action)# 单步交互
M.store_transition(observation.flatten(), action.flatten(), reward.flatten(), observation_.flatten())
# 记忆库存储
# 有的2000个存储数据就开始学习
if M.memory_counter > MemoryCapacity:
b_M = M.sample(BATCH)
b_s = b_M[:, :state_number*(N_Agent+M_Enemy)]
b_a = b_M[:, state_number*(N_Agent+M_Enemy): state_number*(N_Agent+M_Enemy) + action_number*(N_Agent+M_Enemy)]
b_r = b_M[:, -state_number*(N_Agent+M_Enemy) - 1*(N_Agent+M_Enemy): -state_number*(N_Agent+M_Enemy)]
b_s_ = b_M[:, -state_number*(N_Agent+M_Enemy):]
b_s = torch.FloatTensor(b_s)
b_a = torch.FloatTensor(b_a)
b_r = torch.FloatTensor(b_r)
b_s_ = torch.FloatTensor(b_s_)
# if not done:
# new_action_0, log_prob_0 = actor0.evaluate(b_s_[:, 0:state_number])
# target_q10, target_q20 = critic0.target_critic_v(b_s_[:, 0:state_number], new_action_0)
# target_q0 = b_r[:, 0:1] + GAMMA * (1 - b_done) *(torch.min(target_q10, target_q20) - entroy0.alpha * log_prob_0)
# current_q10, current_q20 = critic0.get_v(b_s[:,0:state_number], b_a[:, 0:action_number*1])
# critic0.learn(current_q10, current_q20, target_q0.detach())
# a0, log_prob0 = actor0.evaluate(b_s[:, 0:state_number*1])
# q10, q20 = critic0.get_v(b_s[:, 0:state_number*1], a0)
# q0 = torch.min(q10, q20)
# actor_loss0 = (entroy0.alpha * log_prob0 - q0).mean()
# alpha_loss0 = -(entroy0.log_alpha.exp() * (
# log_prob0 + entroy0.target_entropy).detach()).mean()
# actor0.learn(actor_loss0)
# entroy0.learn(alpha_loss0)
# entroy0.alpha = entroy0.log_alpha.exp()
# # 软更新
# critic0.soft_update()
# if not done:
# new_action_1, log_prob_1 = actor1.evaluate(b_s_[:, state_number:state_number*2])
# target_q11, target_q21 = critic1.target_critic_v(b_s_[:, state_number:state_number*2], new_action_1)
# target_q1= b_r[:, 1:2] + GAMMA * (1 - b_done)*(torch.min(target_q11, target_q21) - entroy1.alpha * log_prob_1)
# current_q11, current_q21 = critic1.get_v(b_s[:, state_number:state_number*2], b_a[:, action_number:action_number * 2])
# critic1.learn(current_q11, current_q21, target_q1.detach())
# a1, log_prob1 = actor1.evaluate(b_s[:, state_number:state_number*2])
# q11, q21 = critic1.get_v(b_s[:, state_number:state_number*2], a1)
# q1 = torch.min(q11, q21)
# actor_loss1 = (entroy1.alpha * log_prob1 - q1).mean()
# alpha_loss1 = -(entroy1.log_alpha.exp() * (
# log_prob1 + entroy1.target_entropy).detach()).mean()
# actor1.learn(actor_loss1)
# entroy1.learn(alpha_loss1)
# entroy1.alpha = entroy1.log_alpha.exp()
# # 软更新
# critic1.soft_update()
for i in range(N_Agent+M_Enemy):
# # # TODO 方法二
# new_action_0, log_prob_0 = actor0.evaluate(b_s_[:, :state_number])
# new_action_1, log_prob_1 = actor0.evaluate(b_s_[:, state_number:state_number * 2])
# new_action = torch.hstack((new_action_0, new_action_1))
# # new_action = torch.cat((new_action_0, new_action_1),dim=1)
# log_prob_ = (log_prob_0 + log_prob_1) / 2
# # log_prob_=torch.hstack((log_prob_0.mean(axis=1).unsqueeze(dim=1),log_prob_1.mean(axis=1).unsqueeze(dim=1)))
# target_q1, target_q2 = critic0.target_critic_v(b_s_, new_action)
#
# target_q = b_r + GAMMA * (torch.min(target_q1, target_q2) - entroy0.alpha * log_prob_)
#
# current_q1, current_q2 = critic0.get_v(b_s, b_a)
# critic0.learn(current_q1, current_q2, target_q.detach())
# a0, log_prob0 = actor0.evaluate(b_s[:, :state_number])
# a1, log_prob1 = actor0.evaluate(b_s[:, state_number:state_number * 2])
# a = torch.hstack((a0, a1))
# # a = torch.cat((a0, a1),dim=1)
# log_prob = (log_prob0 + log_prob1) / 2
# # log_prob = torch.hstack((log_prob0.mean(axis=1).unsqueeze(dim=1), log_prob1.mean(axis=1).unsqueeze(dim=1)))
# q1, q2 = critic0.get_v(b_s, a)
# q = torch.min(q1, q2)
#
# actor_loss = (entroy0.alpha * log_prob - q).mean()
# alpha_loss = -(entroy0.log_alpha.exp() * (log_prob + entroy0.target_entropy).detach()).mean()
#
# actor0.learn(actor_loss)
# # actor1.learn(actor_loss)
# entroy0.learn(alpha_loss)
# entroy0.alpha = entroy0.log_alpha.exp()
# # 软更新
# critic0.soft_update()
# TODO 方法零
# if not done:
new_action, log_prob_ = actors.evaluate(b_s_[:, state_number*i:state_number*(i+1)])
target_q1, target_q2 = critics.target_critic_v(b_s_, new_action)
target_q = b_r[:, i:(i+1)] + GAMMA * (torch.min(target_q1, target_q2) - entroys.alpha * log_prob_)
current_q1, current_q2 = critics.get_v(b_s, b_a[:, action_number*i:action_number*(i+1)])
critics.learn(current_q1, current_q2, target_q.detach())
a, log_prob = actors.evaluate(b_s[:, state_number*i:state_number*(i+1)])
q1, q2 = critics.get_v(b_s, a)
q = torch.min(q1, q2)
actor_loss = (entroys.alpha * log_prob - q).mean()
alpha_loss = -(entroys.log_alpha.exp() * (
log_prob + entroys.target_entropy).detach()).mean()
actors.learn(actor_loss)
entroys.learn(alpha_loss)
entroys.alpha = entroys.log_alpha.exp()
# 软更新
critics.soft_update()
# #TODO 方法一
# new_action_0, log_prob_0 = actors.evaluate(b_s_[:, :state_number])
# new_action_1, log_prob_1 = actors.evaluate(b_s_[:, state_number:state_number * 2])
# new_action = torch.hstack((new_action_0, new_action_1))
# # new_action = torch.cat((new_action_0, new_action_1),dim=1)
# # log_prob_ = (log_prob_0 + log_prob_1) / 2
# # log_prob_=torch.hstack((log_prob_0.mean(axis=1).unsqueeze(dim=1),log_prob_1.mean(axis=1).unsqueeze(dim=1)))
# target_q1, target_q2 = critics.target_critic_v(b_s_, new_action)
# if i==0:
# target_q = b_r[:, i:(i+1)] + GAMMA * (torch.min(target_q1, target_q2) - entroys.alpha * log_prob_0)
# elif i==1:
# target_q = b_r[:, i:(i+1)] + GAMMA * (torch.min(target_q1, target_q2) - entroys.alpha * log_prob_1)
# current_q1, current_q2 = critics.get_v(b_s, b_a)
# critics.learn(current_q1, current_q2, target_q.detach())
# a0, log_prob0 = actors.evaluate(b_s[:, :state_number])
# a1, log_prob1 = actors.evaluate(b_s[:, state_number:state_number * 2])
# a = torch.hstack((a0, a1))
# # a = torch.cat((a0, a1),dim=1)
# # log_prob = (log_prob0 + log_prob1) / 2
# # log_prob = torch.hstack((log_prob0.mean(axis=1).unsqueeze(dim=1), log_prob1.mean(axis=1).unsqueeze(dim=1)))
# q1, q2 = critics.get_v(b_s, a)
# q = torch.min(q1, q2)
# if i == 0:
# actor_loss = (entroys.alpha * log_prob0 - q).mean()
# alpha_loss = -(entroys.log_alpha.exp() * (log_prob0 + entroys.target_entropy).detach()).mean()
# elif i == 1:
# actor_loss = (entroys.alpha * log_prob1 - q).mean()
# alpha_loss = -(entroys.log_alpha.exp() * (log_prob1 + entroys.target_entropy).detach()).mean()
# actors.learn(actor_loss)
# entroys.learn(alpha_loss)
# entroys.alpha = entroys.log_alpha.exp()
# # 软更新
# critics.soft_update()
# #TODO 方法二
# new_action_0, log_prob_0 = actors.evaluate(b_s_[:, :state_number])
# new_action_1, log_prob_1 = actors.evaluate(b_s_[:, state_number:state_number * 2])
# new_action = torch.hstack((new_action_0, new_action_1))
# log_prob_ = (log_prob_0 + log_prob_1) / 2
# # log_prob_=torch.hstack((log_prob_0.mean(axis=1).unsqueeze(dim=1),log_prob_1.mean(axis=1).unsqueeze(dim=1)))
# target_q1, target_q2 = critics.target_critic_v(b_s_, new_action)
# target_q = b_r + GAMMA * (torch.min(target_q1, target_q2) - entroys.alpha * log_prob_)
# current_q1, current_q2 = critics.get_v(b_s, b_a)
# critics.learn(current_q1, current_q2, target_q.detach())
# a0, log_prob0 = actors.evaluate(b_s[:, :state_number])
# a1, log_prob1 = actors.evaluate(b_s[:, state_number:state_number * 2])
# a = torch.hstack((a0, a1))
# log_prob = (log_prob0 + log_prob1) / 2
# # log_prob = torch.hstack((log_prob0.mean(axis=1).unsqueeze(dim=1), log_prob1.mean(axis=1).unsqueeze(dim=1)))
# q1, q2 = critics.get_v(b_s, a)
# q = torch.min(q1, q2)
# actor_loss = ( entroys.alpha * log_prob - q).mean()
# actors.learn(actor_loss)
# alpha_loss = -( entroys.log_alpha.exp() * (log_prob +entroys.target_entropy).detach()).mean()
# entroys.learn(alpha_loss)
# entroys.alpha =entroys.log_alpha.exp()
# # 软更新
# critics.soft_update()
# new_action_0, log_prob_0 = actor.evaluate(b_s_[:, :state_number])
# new_action_1, log_prob_1 = actor.evaluate(b_s_[:, state_number:state_number*2])
# new_action=torch.hstack((new_action_0,new_action_1))
# log_prob_=(log_prob_0+log_prob_1)/2
# # log_prob_=torch.hstack((log_prob_0.mean(axis=1).unsqueeze(dim=1),log_prob_1.mean(axis=1).unsqueeze(dim=1)))
# target_q1,target_q2=critic.target_critic_v(b_s_,new_action)
# target_q=b_r+GAMMA*(torch.min(target_q1,target_q2)-entroy.alpha*log_prob_)
# current_q1, current_q2 = critic.get_v(b_s, b_a)
# critic.learn(current_q1,current_q2,target_q.detach())
# a0,log_prob0=actor.evaluate(b_s[:, :state_number])
# a1, log_prob1 = actor.evaluate(b_s[:, state_number:state_number*2])
# a = torch.hstack((a0, a1))
# log_prob=(log_prob0+log_prob1)/2
# # log_prob = torch.hstack((log_prob0.mean(axis=1).unsqueeze(dim=1), log_prob1.mean(axis=1).unsqueeze(dim=1)))
# q1,q2=critic.get_v(b_s,a)
# q=torch.min(q1,q2)
# actor_loss = (entroy.alpha * log_prob - q).mean()
# actor.learn(actor_loss)
# alpha_loss = -(entroy.log_alpha.exp() * (log_prob + entroy.target_entropy).detach()).mean()
# entroy.learn(alpha_loss)
# entroy.alpha=entroy.log_alpha.exp()
# # 软更新
# critic.soft_update()
observation = observation_
reward_totle += reward.mean()
reward_totle0 += float(reward)
reward_totle1 += float(reward)
if RENDER:
env.render()
if done:
break
print("Ep: {} rewards: {}".format(episode, reward_totle))
all_ep_r.append(reward_totle)
all_ep_r0.append(reward_totle0)
all_ep_r1.append(reward_totle1)
if episode % 20 == 0 and episode > 200:#保存神经网络参数
save_data = {'net': actors.action_net.state_dict(), 'opt': actors.optimizer.state_dict()}
torch.save(save_data, filedir + "\\Path_SAC_actor_L1.pth")
save_data = {'net': actors.action_net.state_dict(), 'opt': actors.optimizer.state_dict()}
torch.save(save_data, filedir + "\\Path_SAC_actor_F1.pth")
# plt.plot(np.arange(len(all_ep_r)), all_ep_r)
# plt.xlabel('Episode')
# plt.ylabel('Total reward')
# plt.figure(2, figsize=(8, 4), dpi=150)
# plt.plot(np.arange(len(all_ep_r0)), all_ep_r0)
# plt.xlabel('Episode')
# plt.ylabel('Leader reward')
# plt.figure(3, figsize=(8, 4), dpi=150)
# plt.plot(np.arange(len(all_ep_r1)), all_ep_r1)
# plt.xlabel('Episode')
# plt.ylabel('Follower reward')
# plt.show()
# env.close()
all_ep_r_mean = np.mean((np.array(all_ep_r)), axis=0)
all_ep_r_std = np.std((np.array(all_ep_r)), axis=0)
all_ep_L_mean = np.mean((np.array(all_ep_r0)), axis=0)
all_ep_L_std = np.std((np.array(all_ep_r0)), axis=0)
all_ep_F_mean = np.mean((np.array(all_ep_r1)), axis=0)
all_ep_F_std = np.std((np.array(all_ep_r1)), axis=0)
d = {"all_ep_r_mean": all_ep_r_mean, "all_ep_r_std": all_ep_r_std,
"all_ep_L_mean": all_ep_L_mean, "all_ep_L_std": all_ep_L_std,
"all_ep_F_mean": all_ep_F_mean, "all_ep_F_std": all_ep_F_std,}
f = open(shoplistfile, 'wb')# 二进制打开,如果找不到该文件,则创建一个
pkl.dump(d, f, pkl.HIGHEST_PROTOCOL)# 写入文件
f.close()
all_ep_r_max = all_ep_r_mean + all_ep_r_std * 0.95
all_ep_r_min = all_ep_r_mean - all_ep_r_std * 0.95
all_ep_L_max = all_ep_L_mean + all_ep_L_std * 0.95
all_ep_L_min = all_ep_L_mean - all_ep_L_std * 0.95
all_ep_F_max = all_ep_F_mean + all_ep_F_std * 0.95
all_ep_F_min = all_ep_F_mean - all_ep_F_std * 0.95
plt.margins(x=0)
plt.plot(np.arange(len(all_ep_r_mean)), all_ep_r_mean, label='MASAC', color='#e75840')
plt.fill_between(np.arange(len(all_ep_r_mean)), all_ep_r_max, all_ep_r_min, alpha=0.6, facecolor='#e75840')
plt.xlabel('Episode')
plt.ylabel('Total reward')
plt.figure(2, figsize=(8, 4), dpi=150)
plt.margins(x=0)
plt.plot(np.arange(len(all_ep_L_mean)), all_ep_L_mean, label='MASAC', color='#e75840')
plt.fill_between(np.arange(len(all_ep_L_mean)), all_ep_L_max, all_ep_L_min, alpha=0.6,
facecolor='#e75840')
plt.xlabel('Episode')
plt.ylabel('Leader reward')
plt.figure(3, figsize=(8, 4), dpi=150)
plt.margins(x=0)
plt.plot(np.arange(len(all_ep_F_mean)), all_ep_F_mean, label='MASAC', color='#e75840')
plt.fill_between(np.arange(len(all_ep_F_mean)), all_ep_F_max, all_ep_F_min, alpha=0.6,
facecolor='#e75840')
plt.xlabel('Episode')
plt.ylabel('Follower reward')
plt.legend()
plt.show()
env.close()
else:
print('SAC测试中...')
aa = Actor()
checkpoint_aa = torch.load(filedir + "\\Path_SAC_actor_L1.pth")
aa.action_net.load_state_dict(checkpoint_aa['net'])
bb = Actor()
checkpoint_bb = torch.load(filedir + "\\Path_SAC_actor_F1.pth")
bb.action_net.load_state_dict(checkpoint_bb['net'])
action = np.zeros((N_Agent+M_Enemy, action_number))
win_times = 0
average_FKR=0
average_timestep=0
average_integral_V=0
average_integral_U= 0
all_ep_V, all_ep_U, all_ep_T, all_ep_F = [], [], [], []
for j in range(TEST_EPIOSDE):
state = env.reset()
total_rewards = 0
integral_V=0
integral_U=0
v,v1,Dis=[],[],[]
for timestep in range(EP_LEN):
for i in range(N_Agent):
action = aa.choose_action(state)
for i in range(M_Enemy):
action = bb.choose_action(state)
# action = aa.choose_action(state)
# action = bb.choose_action(state)
new_state, reward,done,win,team_counter,dis = env.step(action)# 执行动作
if win:
win_times += 1
v.append(state*30)
v1.append(state*30)
Dis.append(dis)
integral_V+=state
integral_U+=abs(action).sum()
total_rewards += reward.mean()
state = new_state
if RENDER:
env.render()
if done:
break
FKR=team_counter/timestep
average_FKR += FKR
average_timestep += timestep
average_integral_V += integral_V
average_integral_U += integral_U
print("Score", total_rewards)
all_ep_V.append(integral_V)
all_ep_U.append(integral_U)
all_ep_T.append(timestep)
all_ep_F.append(FKR)
# print('最大编队保持率',FKR)
# print('最短飞行时间',timestep)
# print('最短飞行路程', integral_V)
# print('最小能量损耗', integral_U)
# d = {"leader": v, "follower": v1 }
# d = {"distance": Dis}
# f = open(shoplistfile_test, 'wb')# 二进制打开,如果找不到该文件,则创建一个
# pkl.dump(d, f, pkl.HIGHEST_PROTOCOL)# 写入文件
# f.close()
# plt.plot(np.arange(len(v)), v)
# plt.plot(np.arange(len(v1)), v1)
# plt.plot(np.arange(len(Dis)), Dis)
# plt.show()
print('任务完成率',win_times / TEST_EPIOSDE)
print('平均最大编队保持率', average_FKR/TEST_EPIOSDE)
print('平均最短飞行时间', average_timestep/TEST_EPIOSDE)
print('平均最短飞行路程', average_integral_V/TEST_EPIOSDE)
print('平均最小能量损耗', average_integral_U/TEST_EPIOSDE)
# d = {"all_ep_V": all_ep_V, "all_ep_U": all_ep_U, "all_ep_T": all_ep_T, "all_ep_F": all_ep_F, }
# f = open(shoplistfile_test1, 'wb')# 二进制打开,如果找不到该文件,则创建一个
# pkl.dump(d, f, pkl.HIGHEST_PROTOCOL)# 写入文件
# f.close()
env.close()
if __name__ == '__main__':
main()
5.2 main_DDPG.py
是作者论文对比的一种强化学习方法
# -*- coding: utf-8 -*-
#开发者:Bright Fang
#开发时间:2023/7/20 23:34
from rl_env.path_env import RlGame
# import pygame
# from assignment import constants as C
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from matplotlib import pyplot as plt
import os
import pickle as pkl
filedir = os.path.dirname(__file__)
shoplistfile = filedir + "\\MADDPG"#保存文件数据所在文件的文件名
shoplistfile_test = filedir + "\\MADDPG_compare"#保存文件数据所在文件的文件名
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
N_Agent=1
M_Enemy=4
RENDER=True
env = RlGame(n=N_Agent,m=M_Enemy,render=RENDER).unwrapped
state_number=7
TEST_EPIOSDE=100
TRAIN_NUM = 3
EP_LEN = 1000
EPIOSDE_ALL=500
action_number=env.action_space.shape
max_action = env.action_space.high
min_action = env.action_space.low
LR_A = 1e-3 # learning rate for actor
LR_C = 1e-3 # learning rate for critic
GAMMA = 0.95
MemoryCapacity=20000
Batch=128
Switch=1
tau = 0.005
'''DDPG第一步 设计A-C框架的Actor(DDPG算法,只有critic的部分才会用到记忆库)'''
'''第一步 设计A-C框架形式的网络部分'''
class ActorNet(nn.Module):
def __init__(self,inp,outp):
super(ActorNet, self).__init__()
self.in_to_y1=nn.Linear(inp,50)
self.in_to_y1.weight.data.normal_(0,0.1)
self.y1_to_y2=nn.Linear(50,20)
self.y1_to_y2.weight.data.normal_(0,0.1)
self.out=nn.Linear(20,outp)
self.out.weight.data.normal_(0,0.1)
def forward(self,inputstate):
inputstate=self.in_to_y1(inputstate)
inputstate=F.relu(inputstate)
inputstate=self.y1_to_y2(inputstate)
inputstate=torch.sigmoid(inputstate)
act=max_action*torch.tanh(self.out(inputstate))
# return F.softmax(act,dim=-1)
return act
class CriticNet(nn.Module):
def __init__(self,input,output):
super(CriticNet, self).__init__()
self.in_to_y1=nn.Linear(input+output,40)
self.in_to_y1.weight.data.normal_(0,0.1)
self.y1_to_y2=nn.Linear(40,20)
self.y1_to_y2.weight.data.normal_(0,0.1)
self.out=nn.Linear(20,1)
self.out.weight.data.normal_(0,0.1)
def forward(self,s,a):
inputstate = torch.cat((s, a), dim=1)
inputstate=self.in_to_y1(inputstate)
inputstate=F.relu(inputstate)
inputstate=self.y1_to_y2(inputstate)
inputstate=torch.sigmoid(inputstate)
Q=self.out(inputstate)
return Q
class Actor():
def __init__(self):
self.actor_estimate_eval,self.actor_reality_target = ActorNet(state_number,action_number),ActorNet(state_number,action_number)
self.optimizer = torch.optim.Adam(self.actor_estimate_eval.parameters(), lr=LR_A)
'''第二步 编写根据状态选择动作的函数'''
def choose_action(self, s):
inputstate = torch.FloatTensor(s)
probs = self.actor_estimate_eval(inputstate)
return probs.detach().numpy()
'''第四步 编写A的学习函数'''
'''生成输入为s的actor估计网络,用于传给critic估计网络,虽然这与choose_action函数一样,但如果直接用choose_action
函数生成的动作,DDPG是不会收敛的,原因在于choose_action函数生成的动作经过了记忆库,动作从记忆库出来后,动作的梯度数据消失了
所以再次编写了learn_a函数,它生成的动作没有过记忆库,是带有梯度的'''
def learn_a(self, s):
s = torch.FloatTensor(s)
A_prob = self.actor_estimate_eval(s)
return A_prob
'''把s_输入给actor现实网络,生成a_,a_将会被传给critic的实现网络'''
def learn_a_(self, s_):
s_ = torch.FloatTensor(s_)
A_prob=self.actor_reality_target(s_).detach()
return A_prob
'''actor的学习函数接受来自critic估计网络算出的Q_estimate_eval当做自己的loss,即负的critic_estimate_eval(s,a),使loss
最小化,即最大化critic网络生成的价值'''
def learn(self, a_loss):
loss = a_loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
'''第六步,最后一步编写软更新程序,Actor部分与critic部分都会有软更新代码'''
'''DQN是硬更新,即固定时间更新,而DDPG采用软更新,w_老_现实=τ*w_新_估计+(1-τ)w_老_现实'''
def soft_update(self):
for target_param, param in zip(self.actor_reality_target.parameters(), self.actor_estimate_eval.parameters()):
target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)
class Critic():
def __init__(self):
self.critic_estimate_eval,self.critic_reality_target=CriticNet(state_number*(N_Agent+M_Enemy),action_number),CriticNet(state_number*(N_Agent+M_Enemy),action_number)
self.optimizer = torch.optim.Adam(self.critic_estimate_eval.parameters(), lr=LR_C)
self.lossfun=nn.MSELoss()
'''第五步 编写critic的学习函数'''
'''使用critic估计网络得到 actor的loss,这里的输入参数a是带梯度的'''
def learn_loss(self, s, a):
s = torch.FloatTensor(s)
# a = a.view(-1, 1)
Q_estimate_eval = -self.critic_estimate_eval(s, a).mean()
return Q_estimate_eval
'''这里的输入参数a与a_是来自记忆库的,不带梯度,根据公式我们会得到critic的loss'''
def learn(self, s, a, r, s_, a_):
s = torch.FloatTensor(s)
a = torch.FloatTensor(a)#当前动作a来自记忆库
r = torch.FloatTensor(r)
s_ = torch.FloatTensor(s_)
# a_ = a_.view(-1, 1)# view中一个参数定为-1,代表动态调整这个维度上的元素个数,以保证元素的总数不变
Q_estimate_eval = self.critic_estimate_eval(s, a)
Q_next = self.critic_reality_target(s_, a_).detach()
Q_reality_target = r + GAMMA * Q_next
loss = self.lossfun(Q_estimate_eval, Q_reality_target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def soft_update(self):
for target_param, param in zip(self.critic_reality_target.parameters(), self.critic_estimate_eval.parameters()):
target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)
'''第三步建立记忆库'''
class Memory():
def __init__(self,capacity,dims):
self.capacity=capacity
self.mem=np.zeros((capacity,dims))
self.memory_counter=0
'''存储记忆'''
def store_transition(self,s,a,r,s_):
tran = np.hstack((s, a,r, s_))# 把s,a,r,s_困在一起,水平拼接
index = self.memory_counter % self.capacity#除余得索引
self.mem = tran# 给索引存值,第index行所有列都为其中一次的s,a,r,s_;mem会是一个capacity行,(s+a+r+s_)列的数组
self.memory_counter+=1
'''随机从记忆库里抽取'''
def sample(self,n):
assert self.memory_counter>=self.capacity,'记忆库没有存满记忆'
sample_index = np.random.choice(self.capacity, n)#从capacity个记忆里随机抽取n个为一批,可得到抽样后的索引号
new_mem = self.mem#由抽样得到的索引号在所有的capacity个记忆中得到记忆s,a,r,s_
return new_mem
'''OU噪声'''
class Ornstein_Uhlenbeck_Noise:
def __init__(self, mu, sigma=0.1, theta=0.1, dt=1e-2, x0=None):
self.theta = theta
self.mu = mu
self.sigma = sigma
self.dt = dt
self.x0 = x0
self.reset()
def __call__(self):
x = self.x_prev + \
self.theta * (self.mu - self.x_prev) * self.dt + \
self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
'''
后两行是dXt,其中后两行的前一行是θ(μ-Xt)dt,后一行是σεsqrt(dt)
'''
self.x_prev = x
return x
def reset(self):
if self.x0 is not None:
self.x_prev = self.x0
else:
self.x_prev = np.zeros_like(self.mu)
def main():
os.environ["SDL_VIDEODRIVER"] = "dummy"
run(env)
def run(env):
if Switch==0:
all_ep_r = [[] for i in range(TRAIN_NUM)]
all_ep_r0 = [[] for i in range(TRAIN_NUM)]
all_ep_r1 = [[] for i in range(TRAIN_NUM)]
for k in range(TRAIN_NUM):
actors =
critics =
for i in range(N_Agent + M_Enemy):
actors = Actor()
critics = Critic()
M = Memory(MemoryCapacity, 2 * state_number*(N_Agent+M_Enemy) + action_number*(N_Agent+M_Enemy) + 1*(N_Agent+M_Enemy))# 奖惩是一个浮点数
ou_noise = Ornstein_Uhlenbeck_Noise(mu=np.zeros(((N_Agent+M_Enemy), action_number)))
action = np.zeros(((N_Agent + M_Enemy), action_number))
# all_ep_r = []
for episode in range(EPIOSDE_ALL):
observation=env.reset()
reward_totle, reward_totle0, reward_totle1 = 0, 0, 0
for timestep in range(EP_LEN):
for i in range(N_Agent + M_Enemy):
action = actors.choose_action(observation)
# action=actor0.choose_action(observation)
# action = actor0.choose_action(observation)
if episode <= 50:
noise = ou_noise()
else:
noise = 0
action = action + noise
action = np.clip(action, -max_action, max_action)
observation_, reward,done,win,team_counter = env.step(action)# 单步交互
M.store_transition(observation.flatten(), action.flatten(), reward.flatten()/1000, observation_.flatten())
if M.memory_counter > MemoryCapacity:
b_M = M.sample(Batch)
b_s = b_M[:, :state_number*(N_Agent+M_Enemy)]
b_a = b_M[:,
state_number * (N_Agent + M_Enemy): state_number * (N_Agent + M_Enemy) + action_number * (
N_Agent + M_Enemy)]
b_r = b_M[:, -state_number * (N_Agent + M_Enemy) - 1 * (N_Agent + M_Enemy): -state_number * (
N_Agent + M_Enemy)]
b_s_ = b_M[:, -state_number * (N_Agent + M_Enemy):]
for i in range(N_Agent + M_Enemy):
actor_action_0 = actors.learn_a(b_s[:, state_number*i:state_number*(i+1)])
# actor_action_1 = actors.learn_a(b_s[:, state_number:state_number * 2])
# actor_action = torch.hstack((actor_action_0, actor_action_1))
actor_action_0_ = actors.learn_a_(b_s_[:, state_number*i:state_number*(i+1)])
# actor_action_1_ = actors.learn_a_(b_s_[:, state_number:state_number * 2])
# actor_action_ = torch.hstack((actor_action_0_, actor_action_1_))
critics.learn(b_s, b_a[:, action_number*i:action_number*(i+1)], b_r, b_s_, actor_action_0_)
Q_c_to_a_loss = critics.learn_loss(b_s, actor_action_0)
actors.learn(Q_c_to_a_loss)
# 软更新
actors.soft_update()
critics.soft_update()
observation = observation_
reward_totle += reward.mean()
reward_totle0 += float(reward)
reward_totle1 += float(reward)
if RENDER:
env.render()
if done:
break
print('Episode {},奖励:{}'.format(episode, reward_totle))
# all_ep_r.append(reward_totle)
all_ep_r.append(reward_totle)
all_ep_r0.append(reward_totle0)
all_ep_r1.append(reward_totle1)
if episode % 50 == 0 and episode > 200:#保存神经网络参数
save_data = {'net': actors.actor_estimate_eval.state_dict(), 'opt': actors.optimizer.state_dict()}
torch.save(save_data, filedir + "\\Path_DDPG_actor_new.pth")
save_data = {'net': actors.actor_estimate_eval.state_dict(), 'opt': actors.optimizer.state_dict()}
torch.save(save_data, filedir + "\\Path_DDPG_actor_1_new.pth")
# plt.plot(np.arange(len(all_ep_r)), all_ep_r)
# plt.xlabel('Episode')
# plt.ylabel('Moving averaged episode reward')
# plt.show()
# env.close()
all_ep_r_mean = np.mean((np.array(all_ep_r)), axis=0)
all_ep_r_std = np.std((np.array(all_ep_r)), axis=0)
all_ep_L_mean = np.mean((np.array(all_ep_r0)), axis=0)
all_ep_L_std = np.std((np.array(all_ep_r0)), axis=0)
all_ep_F_mean = np.mean((np.array(all_ep_r1)), axis=0)
all_ep_F_std = np.std((np.array(all_ep_r1)), axis=0)
d = {"all_ep_r_mean": all_ep_r_mean, "all_ep_r_std": all_ep_r_std,
"all_ep_L_mean": all_ep_L_mean, "all_ep_L_std": all_ep_L_std,
"all_ep_F_mean": all_ep_F_mean, "all_ep_F_std": all_ep_F_std,}
f = open(shoplistfile, 'wb')# 二进制打开,如果找不到该文件,则创建一个
pkl.dump(d, f, pkl.HIGHEST_PROTOCOL)# 写入文件
f.close()
all_ep_r_max = all_ep_r_mean + all_ep_r_std * 0.95
all_ep_r_min = all_ep_r_mean - all_ep_r_std * 0.95
all_ep_L_max = all_ep_L_mean + all_ep_L_std * 0.95
all_ep_L_min = all_ep_L_mean - all_ep_L_std * 0.95
all_ep_F_max = all_ep_F_mean + all_ep_F_std * 0.95
all_ep_F_min = all_ep_F_mean - all_ep_F_std * 0.95
plt.margins(x=0)
plt.plot(np.arange(len(all_ep_r_mean)), all_ep_r_mean, label='MADDPG', color='#e75840')
plt.fill_between(np.arange(len(all_ep_r_mean)), all_ep_r_max, all_ep_r_min, alpha=0.6, facecolor='#e75840')
plt.xlabel('Episode')
plt.ylabel('Total reward')
plt.figure(2, figsize=(8, 4), dpi=150)
plt.margins(x=0)
plt.plot(np.arange(len(all_ep_L_mean)), all_ep_L_mean, label='MADDPG', color='#e75840')
plt.fill_between(np.arange(len(all_ep_L_mean)), all_ep_L_max, all_ep_L_min, alpha=0.6,
facecolor='#e75840')
plt.xlabel('Episode')
plt.ylabel('Leader reward')
plt.figure(3, figsize=(8, 4), dpi=150)
plt.margins(x=0)
plt.plot(np.arange(len(all_ep_F_mean)), all_ep_F_mean, label='MADDPG', color='#e75840')
plt.fill_between(np.arange(len(all_ep_F_mean)), all_ep_F_max, all_ep_F_min, alpha=0.6,
facecolor='#e75840')
plt.xlabel('Episode')
plt.ylabel('Follower reward')
plt.legend()
plt.show()
env.close()
else:
print('MADDPG测试中...')
aa = Actor()
checkpoint_aa = torch.load(filedir + "\\Path_DDPG_actor_new.pth")
aa.actor_estimate_eval.load_state_dict(checkpoint_aa['net'])
bb = Actor()
checkpoint_bb = torch.load(filedir + "\\Path_DDPG_actor_1_new.pth")
bb.actor_estimate_eval.load_state_dict(checkpoint_bb['net'])
action = np.zeros((N_Agent + M_Enemy, action_number))
win_times = 0
average_FKR = 0
average_timestep = 0
average_integral_V = 0
average_integral_U = 0
all_ep_V, all_ep_U, all_ep_T, all_ep_F = [], [], [], []
for j in range(TEST_EPIOSDE):
state = env.reset()
total_rewards = 0
integral_V = 0
integral_U = 0
v, v1 = [], []
for timestep in range(EP_LEN):
for i in range(N_Agent):
action = aa.choose_action(state)
for i in range(M_Enemy):
action = bb.choose_action(state)
# action = aa.choose_action(state)
# action = bb.choose_action(state)
new_state, reward, done, win, team_counter,d = env.step(action)# 执行动作
if win:
win_times += 1
v.append(state)
v1.append(state)
integral_V += state
integral_U += abs(action).sum()
total_rewards += reward.mean()
state = new_state
if RENDER:
env.render()
if done:
break
FKR = team_counter / timestep
average_FKR += FKR
average_timestep += timestep
average_integral_V += integral_V
average_integral_U += integral_U
print("Score", total_rewards)
all_ep_V.append(integral_V)
all_ep_U.append(integral_U)
all_ep_T.append(timestep)
all_ep_F.append(FKR)
# print('最大编队保持率',FKR)
# print('最短飞行时间',timestep)
# print('最短飞行路程', integral_V)
# print('最小能量损耗', integral_U)
# plt.plot(np.arange(len(v)), v)
# plt.plot(np.arange(len(v1)), v1)
# plt.show()
print('任务完成率', win_times / TEST_EPIOSDE)
print('平均最大编队保持率', average_FKR / TEST_EPIOSDE)
print('平均最短飞行时间', average_timestep / TEST_EPIOSDE)
print('平均最短飞行路程', average_integral_V / TEST_EPIOSDE)
print('平均最小能量损耗', average_integral_U / TEST_EPIOSDE)
d = {"all_ep_V": all_ep_V, "all_ep_U": all_ep_U, "all_ep_T": all_ep_T, "all_ep_F": all_ep_F, }
f = open(shoplistfile_test, 'wb')# 二进制打开,如果找不到该文件,则创建一个
pkl.dump(d, f, pkl.HIGHEST_PROTOCOL)# 写入文件
f.close()
env.close()
if __name__ == '__main__':
main()
5.3 main.py
是作者测试训练完评估权重的演示demo
# -*- coding: utf-8 -*-
#开发者:Bright Fang
#开发时间:2023/7/30 18:13
from rl_env.path_env import RlGame
# import pygame
# from assignment import constants as C
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from matplotlib import pyplot as plt
import os
import pickle as pkl
shoplistfile_test = "MASAC_compare"#保存文件数据所在文件的文件名
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
N_Agent=1
M_Enemy=4
RENDER=True
TRAIN_NUM = 1
TEST_EPIOSDE=100
env = RlGame(n=N_Agent,m=M_Enemy,render=RENDER).unwrapped
state_number=7
action_number=env.action_space.shape
max_action = env.action_space.high
min_action = env.action_space.low
EP_MAX = 500
EP_LEN = 1000
def main():
run(env)
def run(env):
print('随机测试中...')
action = np.zeros((N_Agent+M_Enemy, action_number))
win_times = 0
average_FKR=0
average_timestep=0
average_integral_V=0
average_integral_U= 0
all_ep_V,all_ep_U,all_ep_T,all_ep_F=[],[],[],[]
for j in range(TEST_EPIOSDE):
state = env.reset()
total_rewards = 0
integral_V=0
integral_U=0
v,v1=[],[]
for timestep in range(EP_LEN):
for i in range(N_Agent+M_Enemy):
action = env.action_space.sample()
# action = aa.choose_action(state)
# action = bb.choose_action(state)
new_state, reward,done,win,team_counter,d = env.step(action)# 执行动作
if win:
win_times += 1
v.append(state)
v1.append(state)
integral_V+=state
integral_U+=abs(action).sum()
total_rewards += reward.mean()
state = new_state
if RENDER:
env.render()
if done:
break
FKR=team_counter/timestep
average_FKR += FKR
average_timestep += timestep
average_integral_V += integral_V
average_integral_U += integral_U
print("Score", total_rewards)
all_ep_V.append(integral_V)
all_ep_U.append(integral_U)
all_ep_T.append(timestep)
all_ep_F.append(FKR)
# print('最大编队保持率',FKR)
# print('最短飞行时间',timestep)
# print('最短飞行路程', integral_V)
# print('最小能量损耗', integral_U)
# plt.plot(np.arange(len(v)), v)
# plt.plot(np.arange(len(v1)), v1)
# plt.show()
print('任务完成率',win_times / TEST_EPIOSDE)
print('平均最大编队保持率', average_FKR/TEST_EPIOSDE)
print('平均最短飞行时间', average_timestep/TEST_EPIOSDE)
print('平均最短飞行路程', average_integral_V/TEST_EPIOSDE)
print('平均最小能量损耗', average_integral_U/TEST_EPIOSDE)
# d = {"all_ep_V": all_ep_V,"all_ep_U": all_ep_U,"all_ep_T": all_ep_T,"all_ep_F": all_ep_F,}
# f = open(shoplistfile_test, 'wb')# 二进制打开,如果找不到该文件,则创建一个
# pkl.dump(d, f, pkl.HIGHEST_PROTOCOL)# 写入文件
# f.close()
env.close()
if __name__ == '__main__':
main()
第四处是环境配置位置
https://i-blog.csdnimg.cn/direct/1de09e701cd545b28ce1ee0dd10706fe.png
5.4. path_env.py
环境显示调用的类与方法
# -*- coding: utf-8 -*-
#开发者:Bright Fang
#开发时间:2023/7/20 23:30
import numpy as np
import os
import copy
import gym
from assignment import constants as C
from gym import spaces
import math
import random
import pygame
from assignment.components import player
from assignment import tools
from assignment.components import info
class RlGame(gym.Env):
def __init__(self, n,m,render=False):
self.hero_num = n
self.enemy_num = m
self.obstacle_num=1
self.goal_num=1
self.Render=render
self.game_info = {
'epsoide': 0,
'hero_win': 0,
'enemy_win': 0,
'win': '未知',
}
if self.Render:
pygame.init()
pygame.mixer.init()
self.SCREEN = pygame.display.set_mode((C.SCREEN_W, C.SCREEN_H))
pygame.display.set_caption("基于深度强化学习的空战场景无人机路径规划软件")
self.GRAPHICS = tools.load_graphics(".\\assignment\\source\\image")
self.SOUND = tools.load_sound(".\\assignment\\source\\music")
self.clock = pygame.time.Clock()
self.mouse_pos=(100,100)
pygame.time.set_timer(C.CREATE_ENEMY_EVENT, C.ENEMY_MAKE_TIME)
# self.res, init_extra, update_extra, skip_override, waypoints = simulate(filename='')
# else:
# self.dispaly=None
low = np.array([-1,-1])
high=np.array()
# self.action_space =spaces.Discrete(21)
# self.action_space = spaces.Discrete(2)
self.action_space=spaces.Box(low=low,high=high,dtype=np.float32)
# self.action_space = [spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),
# spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),
# spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),
# spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),
# spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),spaces.Discrete(2),]
def start(self):
# self.game_info=game_info
self.finished=False
# self.next='game_over'
self.set_battle_background()#战斗的背景
self.set_enemy_image()
self.set_hero_image()
self.set_obstacle_image()
self.set_goal_image()
self.info = info.Info('battle_screen',self.game_info)
# self.state = 'battle'
self.counter_1 = 0
self.counter_hero = 0
self.enemy_counter=0
self.enemy_counter_1 = 0
#又定义了一个参数,为了放在start函数里重置
self.enemy_num_start=self.enemy_num
self.trajectory_x,self.trajectory_y=[],[]
self.enemy_trajectory_x,self.enemy_trajectory_y=[[] for i in range(self.enemy_num)],[[] for i in range(self.enemy_num)]
# RL状态
# self.hero_state = np.zeros((self.hero_num, 4))
# self.hero_α = np.zeros((self.hero_num, 1))
self.uav_obs_check= np.zeros((self.hero_num, 1))
def set_battle_background(self):
self.battle_background = self.GRAPHICS['background']
self.battle_background = pygame.transform.scale(self.battle_background,C.SCREEN_SIZE)# 缩放
self.view = self.SCREEN.get_rect()
#若要移动的背景图像,请用下面的代码替换
# bg1=player.BackgroundSprite(image_name='background3',size=C.SCREEN_SIZE)
# bg2=player.BackgroundSprite(image_name='background3',size=C.SCREEN_SIZE)
# bg2.rect.y=-bg2.rect.height
# self.background_group=pygame.sprite.Group(bg1,bg2)
def set_hero_image(self):
self.hero = self.__dict__
self.hero_group = pygame.sprite.Group()
self.hero_image = self.GRAPHICS['fighter-blue']
for i in range(self.hero_num):
self.hero['hero'+str(i)]=player.Hero(image=self.hero_image)
self.hero_group.add(self.hero['hero'+str(i)])
def set_enemy_image(self):
self.enemy = self.__dict__
self.enemy_group = pygame.sprite.Group()
self.enemy_image = self.GRAPHICS['fighter-green']
for i in range(self.enemy_num):
self.enemy['enemy'+str(i)]=player.Enemy(image=self.enemy_image)
self.enemy_group.add(self.enemy['enemy'+str(i)])
def set_hero(self):
self.hero = self.__dict__
self.hero_group = pygame.sprite.Group()
for i in range(self.hero_num):
self.hero['hero'+str(i)]=player.Hero()
self.hero_group.add(self.hero['hero'+str(i)])
def set_enemy(self):
self.enemy = self.__dict__
self.enemy_group = pygame.sprite.Group()
for i in range(self.enemy_num):
self.enemy['enemy'+str(i)]=player.Enemy()
self.enemy_group.add(self.enemy['enemy'+str(i)])
def set_obstacle_image(self):
self.obstacle = self.__dict__
self.obstacle_group = pygame.sprite.Group()
self.obstacle_image = self.GRAPHICS['hole']
for i in range(self.obstacle_num):
self.obstacle['obstacle'+str(i)]=player.Obstacle(image=self.obstacle_image)
self.obstacle_group.add(self.obstacle['obstacle'+str(i)])
def set_obstacle(self):
self.obstacle = self.__dict__
self.obstacle_group = pygame.sprite.Group()
for i in range(self.obstacle_num):
self.obstacle['obstacle'+str(i)]=player.Obstacle()
self.obstacle_group.add(self.obstacle['obstacle'+str(i)])
def set_goal_image(self):
self.goal = self.__dict__
self.goal_group = pygame.sprite.Group()
self.goal_image = self.GRAPHICS['goal']
for i in range(self.goal_num):
self.goal['goal'+str(i)]=player.Goal(image=self.goal_image)
self.goal_group.add(self.goal['goal'+str(i)])
def set_goal(self):
self.goal = self.__dict__
self.goal_group = pygame.sprite.Group()
for i in range(self.goal_num):
self.goal['goal'+str(i)]=player.Goal()
self.goal_group.add(self.goal['goal'+str(i)])
def update_game_info(self):#死亡后重置数据
self.game_info['epsoide'] += 1
self.game_info['enemy_win'] = self.game_info['epsoide'] - self.game_info['hero_win']
def reset(self):#reset的仅是环境状态,
# obs=np.zeros((self.n, 4))#这是个二维矩阵,n*2维,现在只考虑一个己方无人机,所以现在是一个一维的
# game_info=self.my_game.state.game_info
# self.my_game.state.start(game_info)
if self.Render:
self.start()
else:
self.set_hero()
self.set_enemy()
self.set_goal()
self.set_obstacle()
self.team_counter = 0
self.done = False
self.hero_state = np.zeros((self.hero_num+self.enemy_num,7))
self.hero_α = np.zeros((self.hero_num, 1))
# self.goal_x,self.goal_y=random.randint(100, 500), random.randint(100, 200)
return np.array([[self.hero0.init_x/1000,self.hero0.init_y/1000,self.hero0.speed/30,self.hero0.theta*57.3/360
,self.goal0.init_x/1000, self.goal0.init_y/1000,0],
[self.enemy0.init_x / 1000, self.enemy0.init_y / 1000, self.enemy0.speed / 30,
self.enemy0.theta * 57.3 / 360
, self.hero0.init_x/1000, self.hero0.init_y/1000,self.hero0.speed / 30],
[self.enemy1.init_x / 1000, self.enemy1.init_y / 1000, self.enemy1.speed / 30,
self.enemy1.theta * 57.3 / 360
, self.hero0.init_x / 1000, self.hero0.init_y / 1000, self.hero0.speed / 30],
[self.enemy2.init_x / 1000, self.enemy2.init_y / 1000, self.enemy2.speed / 30,
self.enemy2.theta * 57.3 / 360
, self.hero0.init_x / 1000, self.hero0.init_y / 1000, self.hero0.speed / 30],
[self.enemy3.init_x / 1000, self.enemy3.init_y / 1000, self.enemy3.speed / 30,
self.enemy3.theta * 57.3 / 360
, self.hero0.init_x / 1000, self.hero0.init_y / 1000, self.hero0.speed / 30],
])#np.array(.posx/1000,self.my_game.state.hero['hero0'].posy/1000,self.my_game.state.hero['hero0'].speed/2,self.my_game.state.hero['hero0'].theta*57.3/360])#np.zeros((self.n,2)).flatten()
def step(self,action):
dis_1_obs = np.zeros((self.hero_num, 1))
dis_1_goal = np.zeros((self.hero_num+self.enemy_num, 1))
r=np.zeros((self.hero_num+self.enemy_num, 1))
o_flag = 0
o_flag1 = 0
#空气阻力系数
F_k=0.08
#无人机质量,100是像素与现实速度的比例,因为10像素/帧对应现实的100m/s
m=12000/100
#扰动的加速度
F_a=0
#边界奖励
edge_r=np.zeros((self.hero_num, 1))
edge_r_f = np.zeros((self.enemy_num, 1))
#避障奖励
obstacle_r = np.zeros((self.hero_num, 1))
obstacle_r1 = np.zeros((self.enemy_num, 1))
#目标奖励
goal_r = np.zeros((self.hero_num, 1))
# 编队奖励
follow_r = np.zeros((self.enemy_num, 1))
follow_r0 = 0
speed_r=0
# print(self.goal0.init_x)
dis_1_agent_0_to_1=math.hypot(self.hero0.posx - self.enemy0.posx, self.hero0.posy - self.enemy0.posy)
dis_1_agent_0_to_2 = math.hypot(self.hero0.posx - self.enemy1.posx, self.hero0.posy - self.enemy1.posy)
dis_1_agent_0_to_3 = math.hypot(self.hero0.posx - self.enemy2.posx, self.hero0.posy - self.enemy2.posy)
dis_1_agent_0_to_4 = math.hypot(self.hero0.posx - self.enemy3.posx, self.hero0.posy - self.enemy3.posy)
for i in range(self.hero_num+self.enemy_num):
#空气阻力
# self.hero['hero' + str(i)].F=F_k*math.pow(self.hero['hero' + str(i)].speed,2)
# F_a=(self.hero['hero' + str(i)].F/m)*math.cos(self.hero['hero' + str(i)].theta * 57.3)
# 己方与障碍物的碰撞检测
# self.hero['hero' + str(i)].enemies = pygame.sprite.spritecollide(self.hero['hero' + str(i)],
# self.obstacle_group, False)
if i==0:#leader
dis_1_obs = math.hypot(self.hero['hero' + str(i)].posx - self.obstacle0.init_x,
self.hero['hero' + str(i)].posy - self.obstacle0.init_y)
dis_1_goal = math.hypot(self.hero['hero' + str(i)].posx - self.goal0.init_x,
self.hero['hero' + str(i)].posy - self.goal0.init_y)
if self.hero['hero' + str(i)].posx <= C.ENEMY_AREA_X + 50:
edge_r = -1
elif self.hero['hero' + str(i)].posx >= C.ENEMY_AREA_WITH:
edge_r = -1
if self.hero['hero' + str(i)].posy >= C.ENEMY_AREA_HEIGHT:
edge_r = -1
elif self.hero['hero' + str(i)].posy <= C.ENEMY_AREA_Y + 50:
edge_r = -1
if 0 < dis_1_agent_0_to_1 < 50 and dis_1_agent_0_to_2<50 and dis_1_agent_0_to_3<50and dis_1_agent_0_to_4<50:
follow_r0=0
self.team_counter+=1
if abs(self.hero0.speed-self.enemy0.speed)<1:
speed_r=1
else:
follow_r0=-0.001*dis_1_agent_0_to_1
if dis_1_goal < 40 and not self.hero['hero' + str(i)].dead:
goal_r = 1000.0
self.hero['hero' + str(i)].win = True
self.hero['hero' + str(i)].die()
self.done= True
# self.game_info['hero_win'] += 1
# self.update_game_info()
print('aa')
# elif dis_1_goal < 100:
# r = 1.0
elif dis_1_obs < 20 and not self.hero['hero' + str(i)].dead:
o_flag = 1
obstacle_r = -500
self.hero['hero' + str(i)].die()
self.hero['hero' + str(i)].win = False
self.done = True
# self.update_game_info()
print('gg')
elif dis_1_obs < 40 and not self.hero['hero' + str(i)].dead:
o_flag = 1
# print(-100000*math.pow(1/dis_1_obs,2))
obstacle_r = -2#-100000*math.pow(1/dis_1_obs,2)
elif not self.hero['hero' + str(i)].dead:
# print(math.exp(100/dis_1_goal)/10)
goal_r =-0.001 * dis_1_goal# math.exp(100/dis_1_goal)/10
r = edge_r + obstacle_r + goal_r+speed_r+follow_r0
self.hero_state = .posx / 1000, self.hero['hero' + str(i)].posy / 1000,
self.hero['hero' + str(i)].speed / 30,
self.hero['hero' + str(i)].theta * 57.3 / 360,
self.goal0.init_x / 1000, self.goal0.init_y / 1000, o_flag]
self.hero['hero' + str(i)].update(action, self.Render)
self.trajectory_x.append(self.hero['hero' + str(i)].posx)
self.trajectory_y.append(self.hero['hero' + str(i)].posy)
else:
dis_2_obs = math.hypot(self.enemy['enemy' + str(i-1)].posx - self.obstacle0.init_x,
self.enemy['enemy' + str(i-1)].posy - self.obstacle0.init_y)
dis_1_goal = math.hypot(self.enemy['enemy' + str(i-1)].posx - self.goal0.init_x,
self.enemy['enemy' + str(i-1)].posy- self.goal0.init_y)
if dis_2_obs < 40:
o_flag1 = 1
obstacle_r1 = -2
if self.enemy['enemy' + str(i-1)].posx <= C.ENEMY_AREA_X + 50:
edge_r_f = -1
elif self.enemy['enemy' + str(i-1)].posx >= C.ENEMY_AREA_WITH:
edge_r_f = -1
if self.enemy['enemy' + str(i-1)].posy >= C.ENEMY_AREA_HEIGHT:
edge_r_f = -1
elif self.enemy['enemy' + str(i-1)].posy <= C.ENEMY_AREA_Y + 50:
edge_r_f = -1
if 0 < dis_1_agent_0_to_1 < 50 and dis_1_goal<dis_1_goal:
# follow_r=5-abs(self.hero0.theta-self.enemy0.theta)
# print('hhh')
if abs(self.hero0.speed-self.enemy0.speed)<1:
speed_r=1
# print('hh')
else:
follow_r=-0.001*dis_1_agent_0_to_1
r =follow_r+speed_r
self.hero_state = .posx / 1000, self.enemy['enemy' + str(i-1)].posy / 1000,
self.enemy['enemy' + str(i-1)].speed / 30,
self.enemy['enemy' + str(i-1)].theta * 57.3 / 360,
self.hero0.posx / 1000, self.hero0.posy / 1000,self.hero0.speed / 30 ]
self.enemy['enemy' + str(i-1)].update(action, self.Render)
self.enemy_trajectory_x.append(self.enemy['enemy' + str(i-1)].posx)
self.enemy_trajectory_y.append(self.enemy['enemy' + str(i-1)].posy)
# print(self.hero_state)
# init_to_goal=math.atan2((-150+self.hero['hero'+str(i)].init_y),(200-self.hero['hero'+str(i)].init_x))
# uav_to_goal = math.atan2((-self.goal0.init_y + self.hero['hero' + str(i)].posy), (self.goal0.init_x - self.hero['hero' + str(i)].posx))
# uav_to_obstacle = math.atan2((-self.obstacle0.init_y + self.hero['hero' + str(i)].posy), (self.obstacle0.init_x - self.hero['hero' + str(i)].posx))
# self.hero_α =0.1*abs(uav_to_obstacle - self.hero['hero' + str(i)].theta)
# 自己更新位置
# self.hero_group.update(action, action,self.Render)
hero_state = copy.deepcopy(self.hero_state)
done = copy.deepcopy(self.done)
return hero_state,r,done,self.hero['hero0'].win,self.team_counter,dis_1_agent_0_to_1
def render(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.display.quit()
quit()
elif event.type == pygame.MOUSEMOTION:
self.mouse_pos = pygame.mouse.get_pos()
elif event.type == C.CREATE_ENEMY_EVENT:
C.ENEMY_FLAG = True
# 画背景
self.SCREEN.blit(self.battle_background, self.view)
# 文字显示
self.info.update(self.mouse_pos)
# 画图
self.draw(self.SCREEN)
pygame.display.update()
self.clock.tick(C.FPS)
def draw(self,surface):
# self.background_group.draw(surface)
#敌占区的矩形
pygame.draw.rect(surface, C.BLACK, C.ENEMY_AREA, 3)
#目标星星
# pygame.draw.polygon(surface, C.GREEN,[(200, 135), (205, 145), (215, 145), (210, 155), (213, 165), (200, 160), (187, 165), (190, 155), (185, 145), (195, 145)])
pygame.draw.circle(surface, C.RED, (self.goal0.init_x, self.goal0.init_y), 1)
pygame.draw.circle(surface, C.RED, (self.goal0.init_x, self.goal0.init_y), 40,1)
# pygame.draw.circle(surface, C.GREEN, (self.goal0.init_x, self.goal0.init_y),100, 1)
pygame.draw.circle(surface, C.BLACK, (self.obstacle0.init_x, self.obstacle0.init_y), 20, 1)
# 画轨迹
for i in range(1, len(self.trajectory_x)):
pygame.draw.line(surface, C.BLUE, (self.trajectory_x, self.trajectory_y), (self.trajectory_x, self.trajectory_y))
for j in range(self.enemy_num):
for i in range(1, len(self.trajectory_x)):
pygame.draw.line(surface, C.GREEN, (self.enemy_trajectory_x, self.enemy_trajectory_y),
(self.enemy_trajectory_x, self.enemy_trajectory_y))
#障碍物
# pygame.draw.circle(surface, C.BLACK, (250, 300), 20)
# 画自己
self.hero_group.draw(surface)
self.enemy_group.draw(surface)
#障碍物
self.obstacle_group.draw(surface)
# 目标星星
self.goal_group.draw(surface)
#画文字信息
self.info.draw(surface)
def close(self):
pygame.display.quit()
quit()
然后直接运行main_SAC.py
https://i-blog.csdnimg.cn/direct/8f2cd47f45424aef9f625be92dd7be63.png
完美!
页:
[1]