前面已经分享过一个仿真项目,但是基于policy gradient方法实现的,考虑到很多人从零到一实现DQN方法有点难度,以是这次分享一个基于DQN实现的仿真项目,非常简单。
这里之以是简单主要得益于它是用pytorch实现的,而pytorch各个版本之间差异不是非常大,可以互用。
这里没有之前那么复杂的建模,首先是任务类
- class Task(object):
- # 任务类
- def __init__(self, jobID, index, CPU, RAM, disk, runtime, status):
- import time
- self.parent = []
- self.child = []
- self.jobID = jobID
- self.index = index
- self.CPU = CPU
- self.RAM = RAM
- self.disk = disk
- self.status = status # -1: rejected, 0: finished, 1: ready, 2: running
- self.runtime = runtime
- self.ddl = time.time() + self.runtime * 5
- self.endtime = 0
复制代码 然后构建DAG,因为云计算中的任务大多是具有关联性的,是有向无环图
- class DAG(object):
- def __init__(self, fname, num_task):
- self.fname = fname
- # 任务数量
- self.num_task = num_task
- self.job = []
- self.task = []
- def readfile(self):
- # 读取任务数据
- num_task = 0
- with open(self.fname, 'r') as f:
- task = []
- for line in f:
- if line[0] == 'J':
- if len(task) != 0:
- self.job.append(task)
- task = []
- else:
- info = list(line.split(','))
- # 任务的信息,jobid,index就是任务的标识,cpu,内存,硬盘,
- # 外加一个状态jobID, index, CPU, RAM, disk, runtime, status)
- task.append \
- (Task(info[5], info[6], float(info[3]), float(info[4]), float(info[8]), float(info[2]), 1))
- num_task += 1
- if num_task == self.num_task:
- break
- if len(task) != 0:
- self.job.append(task)
- def checkRing(self, parent, child):
- # 检查无环
- if parent.index == child.index:
- return True
- if len(child.child) == 0:
- return False
- for c in child.child:
- if self.checkRing(parent, c):
- return True
- return False
- def buildDAG(self):
- # 构建有向无环图
- import random
- for job in self.job:
- for task in job:
- i = random.randint(-len(job), len(job) - 1)
- if i < 0:
- continue
- parent = job[i]
- if self.checkRing(parent, task) == False:
- task.parent.append(parent)
- parent.child.append(task)
- ……
- ……
复制代码 环境类,定义云计算资源,以及调理过程中状态的转移,练习过程等等
- class environment(object):
- def __init__(self, scale, fname, num_task, num_server):
- self.scale = scale
- self.fname = fname
- self.task = []
- self.dag = DAG(self.fname, num_task) # 根据task数量构建dag
- # 设置每个服务器上虚拟机的数量
- self.VMNum = 5
- self.rej = 0
- # 任务数量和服务器数量是通过参数传递的
- self.num_task = num_task
- self.severNum = num_server
- # 而集群数量是通过计算出来的
- if num_server <= 50:
- self.farmNum = 1
- else:
- if int(self.severNum / 50) * 50 < num_server:
- self.farmNum = int(self.severNum / 50) + 1
- else:
- self.farmNum = int(self.severNum / 50)
- self.remainFarm = []
- self.FarmResources = []
- self.severs = [[1, 1] for _ in range(self.severNum)]
- self.VMtask = []
- self.totalcost = 0
- #self.init_severs(num_server)
- self.losses_stage1 = []
- self.losses_stage2 = []
- print("Total Number of tasks: {0}".format(num_task))
- def init_severs(self, severNum):
- # 服务器,host,每个host上又可以虚拟出一定的虚拟机,然后虚拟机处理任务
- VM = [[[1.0 / self.VMNum, 1.0 / self.VMNum] for _ in range(self.VMNum)] for _ in range(severNum)]
- self.VMtask.append([[[] for _ in range(self.VMNum)] for _ in range(severNum)])
- return VM
- ……
- ……
复制代码 构建DQN的智能体,有Q值的计算和更新,才是基于值的强化学习方法
- class Agent():
- def __init__(self, input_dims, n_actions, lr, gamma=0.99,
- epsilon=1.0, eps_dec=1e-5, eps_min=0.01):
- self.lr = lr
- self.input_dims = input_dims
- self.n_actions = n_actions
- self.gamma = gamma
- self.epsilon = epsilon
- self.eps_dec = eps_dec
- self.eps_min = eps_min
- self.action_space = [i for i in range(self.n_actions)]
- self.Q = LinearDeepQNetwork(self.lr, self.n_actions, self.input_dims)
- self.losses = []
- def choose_action(self, state):
- if np.random.random() > self.epsilon:
- state1 = T.tensor(state, dtype=T.float).to(self.Q.device)
- actions = self.Q.forward(state1)
- #选最大的动作执行
- action = T.argmax(actions).item()
- else:
- action = np.random.choice(self.action_space)
- return action
- def decrement_epsilon(self):
- #贪心的变化
- self.epsilon = self.epsilon - self.eps_dec \
- if self.epsilon > self.eps_min else self.eps_min
- def learn(self, state, action, reward, state_):
- self.Q.optimizer.zero_grad()
- states = T.tensor(state, dtype=T.float).to(self.Q.device)
- actions = T.tensor(action).to(self.Q.device)
- rewards = T.tensor(reward).to(self.Q.device)
- states_ = T.tensor(state_, dtype=T.float).to(self.Q.device)
- q_pred = self.Q.forward(states)[actions]
- q_next = self.Q.forward(states_).max()
- q_target = reward + self.gamma*q_next
- loss = self.Q.loss(q_target, q_pred).to(self.Q.device)
- loss.backward()
- self.Q.optimizer.step()
- self.decrement_epsilon()
- self.losses.append(loss.item())
复制代码 在此底子上,可以继续实现fixed-q-target和experience replay以及double QDN等优化
我添加了打印损失函数值的代码

以是为了方便程序的运行和跨时间段使用,修改等,发起用pytorch举行实现
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |