mit6.824 lab1 (2022)

打印 上一主题 下一主题

主题 838|帖子 838|积分 2529

  1. <br> lab1 要求按照论文实现一个mapReduce 框架<br>
复制代码
  1.  lab1 :https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
复制代码
  论文:https://zhuanlan.zhihu.com/p/122571315
  在mrsequential.go文件中有个单机版mapReduce实现很简单建议阅读。
  1.  
复制代码
  1. [/code] 
  2.  
  3. 整体框架流程:
  4.   [img]https://img2022.cnblogs.com/blog/1729513/202207/1729513-20220708145109475-401531304.png[/img]
  5.  
  6.  
  7.  
  8.  
  9.  
  10.  Coordinator 是协调器,负责
  11.           ① 给woker分发任务
  12.           ② 合并由map任务执行产生的中间文件
  13.           ③ 任务超时重新分配任务
  14.  
  15. woker 是工作器,负责
  16.         ①循环申请map 或reduce任务
  17.    
  18.  
  19. 先看woker:
  20.   worker 向 Coordinator 发送任务申请后,判断得到的是什么样类型的任务
  21. [code]//申请任务
  22. for {
  23.    args := Args{}
  24.    args.Signal = REQUEST_WORKER<br><br>   reply := RpcCall(args)
  25.    switch reply.STATUS {
  26.    case COORDINATOR_MAP:    //获得map任务
  27.      MapHandle(&reply,mapf)<br>
  28.    case COORDINATOR_REDUCE: //获得reduce任务
  29.     ReduceHandle(&reply,reducef)
复制代码
  1.   case COORDINATOR__MAP_END:  //没申请到任务重新获取<br>     continue<br><br>  case END:                //结束<br>     return
复制代码
  1.   }  
复制代码
  
  1. Recude任务<br>  处理方式和mrsequential.go中几乎是一样的不多说了。<br><br>map任务<br>  会从Coordinator 获得文件名、任务id、Nreduce(中间文件个数)<br><br>
复制代码
 
  1. [/code][img]https://img2022.cnblogs.com/blog/1729513/202207/1729513-20220708151531201-347578160.png[/img]
  2.  
  3.  kva是通过mapf 对文件处理得到的数据。
  4. 我开启两个任务分发器,和Nreduce 个文件写入器,进行并发处理数据。将数据写入到Nreduce个中间文件中,分发依据为ihash函数。
  5. [code]kva := MapMachingFile(reply.FileName, mapf)
  6.         midFileName := "mr-out-" + reply.FileName
  7.         chanArray := make([]chan KeyValue, 10)
  8.         for i := 0; i < 10; i++ {
  9.                 chanArray[i] = make(chan KeyValue, 10)
  10.         }<br><br>        //开启reduceNumber个文件写入线程
  11.         var w sync.WaitGroup
  12.         var mapW sync.WaitGroup
  13.         w.Add(reply.Neduce)
  14.         mapW.Add(2)
  15.         for i := 0; i < 10; i++ {
  16.                 go GoMakeMidFile(midFileName+strconv.Itoa(i), chanArray[i], &w)
  17.         }<br><br>        // 开启分发线程,分发数据到文件写入线程
  18.         lenght := len(kva)
  19.         go MapDistributeMidData(chanArray, kva[:lenght/2], &mapW)
  20.         go MapDistributeMidData(chanArray, kva[lenght/2:], &mapW)<br><br>        //所有分发线程结束
  21.         mapW.Wait()
  22.         for cIndex := 0; cIndex < 10; cIndex++ {
  23.                 close(chanArray[cIndex])
  24.         }<br><br>        //所有文件写入线程结束
  25.         w.Wait()
复制代码
  
 
worker结束剩下看Coordinator 。
 
  1. 1 type Coordinator struct {
  2. 2     filebit                    //数据分发记录
  3. 3     Nreduce       int
  4. 4     midFileMergeC chan int
  5. 5     Mergefiled             //已处理数据记录
  6. 6     monitorC      []chan int //监听每个worker是否按时完成
  7. 7     STATUS
  8. 8     RedeceS
  9. 9     *sync.Mutex
  10. 10     End            bool
  11. 11 }
复制代码
 
  1. Coordinator  结构记录的信息主要为三部分<br>        2、3、4、5行记录map相关<br>        6 为监听chan,监听任务是否超时<br>        7位Coordinator 当前的状态,通过状态判断要分发map任务、reduce任务、结束<br><br><br><br>判断worker的目的,请求任务就分发任务处理,完成map任务就将所有map产生中间数据一一对应合并到Nreduce个文件中。
复制代码
  1. //信号处理
  2. func (c *Coordinator) SignalTask (args *Args, reply *Reply) error {
  3.     switch args.Signal {
  4.     case REQUEST_WORKER:
  5.         c.distributeTask(args,reply)<br><br>    //中间文件处理
  6.     case COMPLETE:
  7.         c.midFileMerge(args,reply)
  8.     }<br><br>    return nil
  9. }
复制代码
  1. [/code] 在初始化Coordinator时,还会打开一些线程。本线程会开启10个中间文件写入线程,当每个worker处理完map任务后,会将自己处理的map文件相关信息传给[i]Coordinator,[/i][i]Coordinator通过chan将数据发给每个文件合并线程[/i]StartMergeFile。
  2. [i]举个例子[/i]
  3. [i]workerMap A产生了  1,2,3 个中间文件[/i]
  4. [i]1号文件 合并到 [i]mr-out-m-1[/i][/i]
  5. [i][i][i]2号文件 合并到 [i]mr-out-m-2[/i][/i][/i][/i]
  6. [i]3号文件 合并到 [i]mr-out-m-3[/i][/i]
  7.  
  8. [i]workerMap  B 又产生1、2、3个中间文件[/i]
  9. [i]1号文件 合并到 [i]mr-out-m-1[/i][/i]
  10. [i][i][i]2号文件 合并到 [i]mr-out-m-2[/i][/i][/i][/i]
  11. [i]3号文件 合并到 [i]mr-out-m-3[/i][/i]
  12. [i][i] [/i][/i][code]<br><br>
复制代码
  1. //开启Nreduce个中间文件写入线程
  2. //返回文件写入chan 切片
  3. func (c *Coordinator)runFileWorker () []chan int {
  4.         cLi := make([]chan int,c.Nreduce)
  5.         for i := 0 ; i < c.Nreduce ; i ++ {
  6.                 cLi[i] = make(chan int,10)
  7.         }
  8.         for fid := 0 ; fid < c.Nreduce ; fid ++ {
  9.                 go c.StartMergeFile(fid,cLi[fid])
  10.         }
  11.         return cLi
  12. }
复制代码
  
  1. <br>记录信息是否已处理的结构:<br>
复制代码
  1.    filebit 、ReduceS 核心是通过一个简单的bitmap实现的
复制代码
  1. type filebit struct{
  2.         rw                 *sync.Mutex
  3.         bitMap
  4.         file []string
  5. }
  6. type RedeceS struct {
  7.         filebit
  8. }
复制代码
  
[code][/code][code]type bitMap struct {        bit  int16        size int}//获取一个未使用位置func (b *bitMap) GetOne() int {        for i := int(0) ; i < b.size ; i ++ {                if b.isZero(i) {                        b.seTUsed(i)                        return i                }        }        //这里超过size限制会直接报错        return -1}//第i位是否为0//为0未使用func (b *bitMap) isZero (index int) bool {        return ((1

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连全瓷种植牙齿制作中心

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表