- <br> lab1 要求按照论文实现一个mapReduce 框架<br>
复制代码- lab1 :https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
复制代码 论文:https://zhuanlan.zhihu.com/p/122571315
在mrsequential.go文件中有个单机版mapReduce实现很简单建议阅读。- [/code]
-
- 整体框架流程:
- [img]https://img2022.cnblogs.com/blog/1729513/202207/1729513-20220708145109475-401531304.png[/img]
-
-
-
-
-
- Coordinator 是协调器,负责
- ① 给woker分发任务
- ② 合并由map任务执行产生的中间文件
- ③ 任务超时重新分配任务
-
- woker 是工作器,负责
- ①循环申请map 或reduce任务
-
-
- 先看woker:
- worker 向 Coordinator 发送任务申请后,判断得到的是什么样类型的任务
- [code]//申请任务
- for {
- args := Args{}
- args.Signal = REQUEST_WORKER<br><br> reply := RpcCall(args)
- switch reply.STATUS {
- case COORDINATOR_MAP: //获得map任务
- MapHandle(&reply,mapf)<br>
- case COORDINATOR_REDUCE: //获得reduce任务
- ReduceHandle(&reply,reducef)
复制代码- case COORDINATOR__MAP_END: //没申请到任务重新获取<br> continue<br><br> case END: //结束<br> return
复制代码 - Recude任务<br> 处理方式和mrsequential.go中几乎是一样的不多说了。<br><br>map任务<br> 会从Coordinator 获得文件名、任务id、Nreduce(中间文件个数)<br><br>
复制代码 - [/code][img]https://img2022.cnblogs.com/blog/1729513/202207/1729513-20220708151531201-347578160.png[/img]
-
- kva是通过mapf 对文件处理得到的数据。
- 我开启两个任务分发器,和Nreduce 个文件写入器,进行并发处理数据。将数据写入到Nreduce个中间文件中,分发依据为ihash函数。
- [code]kva := MapMachingFile(reply.FileName, mapf)
- midFileName := "mr-out-" + reply.FileName
- chanArray := make([]chan KeyValue, 10)
- for i := 0; i < 10; i++ {
- chanArray[i] = make(chan KeyValue, 10)
- }<br><br> //开启reduceNumber个文件写入线程
- var w sync.WaitGroup
- var mapW sync.WaitGroup
- w.Add(reply.Neduce)
- mapW.Add(2)
- for i := 0; i < 10; i++ {
- go GoMakeMidFile(midFileName+strconv.Itoa(i), chanArray[i], &w)
- }<br><br> // 开启分发线程,分发数据到文件写入线程
- lenght := len(kva)
- go MapDistributeMidData(chanArray, kva[:lenght/2], &mapW)
- go MapDistributeMidData(chanArray, kva[lenght/2:], &mapW)<br><br> //所有分发线程结束
- mapW.Wait()
- for cIndex := 0; cIndex < 10; cIndex++ {
- close(chanArray[cIndex])
- }<br><br> //所有文件写入线程结束
- w.Wait()
复制代码
worker结束剩下看Coordinator 。
- 1 type Coordinator struct {
- 2 filebit //数据分发记录
- 3 Nreduce int
- 4 midFileMergeC chan int
- 5 Mergefiled //已处理数据记录
- 6 monitorC []chan int //监听每个worker是否按时完成
- 7 STATUS
- 8 RedeceS
- 9 *sync.Mutex
- 10 End bool
- 11 }
复制代码 - Coordinator 结构记录的信息主要为三部分<br> 2、3、4、5行记录map相关<br> 6 为监听chan,监听任务是否超时<br> 7位Coordinator 当前的状态,通过状态判断要分发map任务、reduce任务、结束<br><br><br><br>判断worker的目的,请求任务就分发任务处理,完成map任务就将所有map产生中间数据一一对应合并到Nreduce个文件中。
复制代码- //信号处理
- func (c *Coordinator) SignalTask (args *Args, reply *Reply) error {
- switch args.Signal {
- case REQUEST_WORKER:
- c.distributeTask(args,reply)<br><br> //中间文件处理
- case COMPLETE:
- c.midFileMerge(args,reply)
- }<br><br> return nil
- }
复制代码- [/code] 在初始化Coordinator时,还会打开一些线程。本线程会开启10个中间文件写入线程,当每个worker处理完map任务后,会将自己处理的map文件相关信息传给[i]Coordinator,[/i][i]Coordinator通过chan将数据发给每个文件合并线程[/i]StartMergeFile。
- [i]举个例子[/i]
- [i]workerMap A产生了 1,2,3 个中间文件[/i]
- [i]1号文件 合并到 [i]mr-out-m-1[/i][/i]
- [i][i][i]2号文件 合并到 [i]mr-out-m-2[/i][/i][/i][/i]
- [i]3号文件 合并到 [i]mr-out-m-3[/i][/i]
-
- [i]workerMap B 又产生1、2、3个中间文件[/i]
- [i]1号文件 合并到 [i]mr-out-m-1[/i][/i]
- [i][i][i]2号文件 合并到 [i]mr-out-m-2[/i][/i][/i][/i]
- [i]3号文件 合并到 [i]mr-out-m-3[/i][/i]
- [i][i] [/i][/i][code]<br><br>
复制代码- //开启Nreduce个中间文件写入线程
- //返回文件写入chan 切片
- func (c *Coordinator)runFileWorker () []chan int {
- cLi := make([]chan int,c.Nreduce)
- for i := 0 ; i < c.Nreduce ; i ++ {
- cLi[i] = make(chan int,10)
- }
- for fid := 0 ; fid < c.Nreduce ; fid ++ {
- go c.StartMergeFile(fid,cLi[fid])
- }
- return cLi
- }
复制代码 - filebit 、ReduceS 核心是通过一个简单的bitmap实现的
复制代码- type filebit struct{
- rw *sync.Mutex
- bitMap
- file []string
- }
- type RedeceS struct {
- filebit
- }
复制代码
[code][/code][code]type bitMap struct { bit int16 size int}//获取一个未使用位置func (b *bitMap) GetOne() int { for i := int(0) ; i < b.size ; i ++ { if b.isZero(i) { b.seTUsed(i) return i } } //这里超过size限制会直接报错 return -1}//第i位是否为0//为0未使用func (b *bitMap) isZero (index int) bool { return ((1 |