云计算任务调理仿真02

打印 上一主题 下一主题

主题 907|帖子 907|积分 2721

前面已经分享过一个仿真项目,但是基于policy gradient方法实现的,考虑到很多人从零到一实现DQN方法有点难度,以是这次分享一个基于DQN实现的仿真项目,非常简单。

这里之以是简单主要得益于它是用pytorch实现的,而pytorch各个版本之间差异不是非常大,可以互用。
这里没有之前那么复杂的建模,首先是任务类
  1. class Task(object):
  2.     # 任务类
  3.     def __init__(self, jobID, index, CPU, RAM, disk, runtime, status):
  4.         import time
  5.         self.parent = []
  6.         self.child = []
  7.         self.jobID = jobID
  8.         self.index = index
  9.         self.CPU = CPU
  10.         self.RAM = RAM
  11.         self.disk = disk
  12.         self.status = status  # -1: rejected, 0: finished, 1: ready, 2: running
  13.         self.runtime = runtime
  14.         self.ddl = time.time() + self.runtime * 5
  15.         self.endtime = 0
复制代码
然后构建DAG,因为云计算中的任务大多是具有关联性的,是有向无环图
  1. class DAG(object):
  2.     def __init__(self, fname, num_task):
  3.         self.fname = fname
  4.         # 任务数量
  5.         self.num_task = num_task
  6.         self.job = []
  7.         self.task = []
  8.     def readfile(self):
  9.         # 读取任务数据
  10.         num_task = 0
  11.         with open(self.fname, 'r') as f:
  12.             task = []
  13.             for line in f:
  14.                 if line[0] == 'J':
  15.                     if len(task) != 0:
  16.                         self.job.append(task)
  17.                         task = []
  18.                 else:
  19.                     info = list(line.split(','))
  20.                     # 任务的信息,jobid,index就是任务的标识,cpu,内存,硬盘,
  21.                     # 外加一个状态jobID, index, CPU, RAM, disk, runtime, status)
  22.                     task.append \
  23.                         (Task(info[5], info[6], float(info[3]), float(info[4]), float(info[8]), float(info[2]), 1))
  24.                     num_task += 1
  25.                 if num_task == self.num_task:
  26.                     break
  27.             if len(task) != 0:
  28.                 self.job.append(task)
  29.     def checkRing(self, parent, child):
  30.         # 检查无环
  31.         if parent.index == child.index:
  32.             return True
  33.         if len(child.child) == 0:
  34.             return False
  35.         for c in child.child:
  36.             if self.checkRing(parent, c):
  37.                 return True
  38.         return False
  39.     def buildDAG(self):
  40.         # 构建有向无环图
  41.         import random
  42.         for job in self.job:
  43.             for task in job:
  44.                 i = random.randint(-len(job), len(job) - 1)
  45.                 if i < 0:
  46.                     continue
  47.                 parent = job[i]
  48.                 if self.checkRing(parent, task) == False:
  49.                     task.parent.append(parent)
  50.                     parent.child.append(task)
  51. ……
  52. ……
复制代码
环境类,定义云计算资源,以及调理过程中状态的转移,练习过程等等
  1. class environment(object):
  2.     def __init__(self, scale, fname, num_task, num_server):
  3.         self.scale = scale
  4.         self.fname = fname
  5.         self.task = []
  6.         self.dag = DAG(self.fname, num_task)  # 根据task数量构建dag
  7.         # 设置每个服务器上虚拟机的数量
  8.         self.VMNum = 5
  9.         self.rej = 0
  10.         # 任务数量和服务器数量是通过参数传递的
  11.         self.num_task = num_task
  12.         self.severNum = num_server
  13.         # 而集群数量是通过计算出来的
  14.         if num_server <= 50:
  15.             self.farmNum = 1
  16.         else:
  17.             if int(self.severNum / 50) * 50 < num_server:
  18.                 self.farmNum = int(self.severNum / 50) + 1
  19.             else:
  20.                 self.farmNum = int(self.severNum / 50)
  21.         self.remainFarm = []
  22.         self.FarmResources = []
  23.         self.severs = [[1, 1] for _ in range(self.severNum)]
  24.         self.VMtask = []
  25.         self.totalcost = 0
  26.         #self.init_severs(num_server)
  27.         self.losses_stage1 = []
  28.         self.losses_stage2 = []
  29.         print("Total Number of tasks: {0}".format(num_task))
  30.     def init_severs(self, severNum):
  31.         # 服务器,host,每个host上又可以虚拟出一定的虚拟机,然后虚拟机处理任务
  32.         VM = [[[1.0 / self.VMNum, 1.0 / self.VMNum] for _ in range(self.VMNum)] for _ in range(severNum)]
  33.         self.VMtask.append([[[] for _ in range(self.VMNum)] for _ in range(severNum)])
  34.         return VM
  35. ……
  36. ……
复制代码
构建DQN的智能体,有Q值的计算和更新,才是基于值的强化学习方法
  1. class Agent():
  2.     def __init__(self, input_dims, n_actions, lr, gamma=0.99,
  3.                  epsilon=1.0, eps_dec=1e-5, eps_min=0.01):
  4.         self.lr = lr
  5.         self.input_dims = input_dims
  6.         self.n_actions = n_actions
  7.         self.gamma = gamma
  8.         self.epsilon = epsilon
  9.         self.eps_dec = eps_dec
  10.         self.eps_min = eps_min
  11.         self.action_space = [i for i in range(self.n_actions)]
  12.         self.Q = LinearDeepQNetwork(self.lr, self.n_actions, self.input_dims)
  13.         self.losses = []
  14.     def choose_action(self, state):
  15.         if np.random.random() > self.epsilon:
  16.             state1 = T.tensor(state, dtype=T.float).to(self.Q.device)
  17.             actions = self.Q.forward(state1)
  18.             #选最大的动作执行
  19.             action = T.argmax(actions).item()
  20.         else:
  21.             action = np.random.choice(self.action_space)
  22.         return action
  23.     def decrement_epsilon(self):
  24.         #贪心的变化
  25.         self.epsilon = self.epsilon - self.eps_dec \
  26.                         if self.epsilon > self.eps_min else self.eps_min
  27.     def learn(self, state, action, reward, state_):
  28.         self.Q.optimizer.zero_grad()
  29.         states = T.tensor(state, dtype=T.float).to(self.Q.device)
  30.         actions = T.tensor(action).to(self.Q.device)
  31.         rewards = T.tensor(reward).to(self.Q.device)
  32.         states_ = T.tensor(state_, dtype=T.float).to(self.Q.device)
  33.         q_pred = self.Q.forward(states)[actions]
  34.         q_next = self.Q.forward(states_).max()
  35.         q_target = reward + self.gamma*q_next
  36.         loss = self.Q.loss(q_target, q_pred).to(self.Q.device)
  37.         loss.backward()
  38.         self.Q.optimizer.step()
  39.         self.decrement_epsilon()
  40.         self.losses.append(loss.item())
复制代码
在此底子上,可以继续实现fixed-q-target和experience replay以及double QDN等优化
我添加了打印损失函数值的代码

以是为了方便程序的运行和跨时间段使用,修改等,发起用pytorch举行实现

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

数据人与超自然意识

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表