mit 6.824 lab1分析

打印 上一主题 下一主题

主题 654|帖子 654|积分 1962

6.824 lab1 笔记

1. 阅读论文


2. 官网rules & hints

2.1 rules


  • map阶段每个worker应该把中间文件分成nReduce份,nReduce是reduce任务的数量
  • worker完成reduce任务后生成文件名mr-out-X
  • mr-out-X文件每行应该是"%v %v"格式,参考main/mrsequential.go
  • worker处理完map任务,应该把生成的中间文件放到当前目录中,便于worker执行reduce任务时读取中间文件
  • 当所有任务完成时,Done()函数应该返回true,使得coordinator退出
  • 所有任务完成时,worker应该退出,方法是:

    • 当worker调用rpc向coordinator请求任务时,连接不上coordinator,此时可以认为coordinator已经退出因为所有任务已经完成了
    • 当worker调用rpc向coordinator请求任务时,coordinator可以向其回复所有任务已经完成

2.2 hints


  • 刚开始可以修改mr/worker.go's ``Worker()向coordinator 发送一个RPC请求一个任务。然后修改coordinator回复一个文件名,代表空闲的map任务。然后worker根据文件名读取文件,调用wc.so-Map函数,调用Map函数可参考mrsequential.go`
  • 如果修改了mr/目录下任何文件,应该重新build MapReduce plugins,go build -buildmode=plugin ../mrapps/wc.go
  • worker处理完map任务后产生的中间文件命名格式mr-X-Y,x是map任务的编号,y是reduce任务编号。
    1. // 初始文件,通过命令行传入的,如
    2. // pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt
    3. // len(files) = 3 nReduce = 4
    4. // 中间文件  x:map任务的编号 y:reduce任务编号
    5. // mr-0-0 mr-1-0 mr-2-0
    6. // mr-0-1 mr-1-1 mr-2-1
    7. // mr-0-2 mr-1-2 mr-2-2
    8. // mr-0-3 mr-1-3 mr-2-3
    复制代码
  • map任务存储数据到文件可以使用json格式,便于reduce任务读取
    1.   // map
    2.   enc := json.NewEncoder(file)
    3.   for _, kv := ... {
    4.     err := enc.Encode(&kv)
    5.       
    6.   // reduce
    7.   dec := json.NewDecoder(file)
    8.   for {
    9.     var kv KeyValue
    10.     if err := dec.Decode(&kv); err != nil {
    11.       break
    12.     }
    13.     kva = append(kva, kv)
    14.   }
    复制代码
  • map阶段使用ihash(key)函数把key映射到哪个reduce任务,如某个worker取得了2号map任务,ihash("apple") = 1,那么就应该把该key放到mr-2-1文件中
  • 可以参考mrsequential.go代码:读取初始输入文件、排序key、存储reduce输出文件
  • coordinator是rpc server,将会被并发访问,需要对共享变量加锁
  • 若当前未有空闲的map任务可以分配,worker应该等待一段时间再请求任务,若worker频繁请求任务,coordinator就会频繁加锁、访问数据、释放锁,浪费资源和时间。如使用time.Sleep(),worker可以每隔一秒发送一次请求任务rpc
  • coordinator无法辨别某个worker是否crash,有可能某个worker还在运行,但是运行极其慢(由于硬件损坏等原因),最好的办法是:coordinator监控某个任务,若该任务未在规定时间内由worker报告完成,那么coordinator可以把该任务重新分配给其他worker,该lab规定超时时间是10s
  • 为了确保某个worker在写入文件时,不会有其他worker同时写入;又或者是某个worker写入文件时中途退出了,只写了部分数据,不能让这个没写完的文件让其他worker看到。可以使用临时文件ioutil.TempFile,当写入全部完成时,再使用原子重命名os.Rename。
  • Go RPC只能传struct中大写字母开头的变量
  • 调用RPC call() 函数时,reply struct应该为空,不然会报错
    1.   reply := SomeType{}
    2.   call(..., &reply)
    复制代码
3. 架构设计


3.1 RPC设计

在该lab中,我们需要两个RPC,一个是callTask RPC向coordinator请求一个任务,一个是callTaskDone RPC向coordinator报告某个任务的完成,以下皆在rpc.go中定义

  • 首先定义一个枚举变量,表示coordinator给worker分配的任务类型,也可用来表示coordinator当前的phase
    1. type taskType int
    2. const (
    3.     // map任务
    4.         mapType taskType = iota
    5.     // reduce任务
    6.         reduceType
    7.     // 当前没有空闲任务,请等待
    8.     waitting
    9.     // 已经完成全部任务,可以退出了
    10.         done
    11. )
    复制代码
  • 定义拉取任务RPC的args和reply struct
    CallTaskArgs中不需要包含变量,只需要让coordinator知道该worker正在请求一个任务,coordinator会随机选择空闲任务进行分配填入CallTaskReply中
    CallTaskReply包含以下变量:

    • FileName:map阶段,worker需要知道具体的文件名才能解析该文件
    • tp:指示该任务的具体类型
    • TaskID:worker将该变量放入CallTaskDoneArgs中,coordinator可以迅速定位Task[TaskID],并且在reduce阶段中,搭配nFiles变量,worker读取mr-0-TaskID、mr-1-TaskID....mr-nFiles-1-TaskID文件
    • nFiles:初始文件的数量,用于搭配TaskID,在上面已介绍
    • nReduce:用于map阶段,ihash(key) % nReduce
    1. type CallTaskArgs struct {
    2. }
    3. type CallTaskReply struct {
    4.         FileName string
    5.         TaskID   int
    6.         tp       taskType
    7.         nFiles   int
    8.         nReduce  int
    9. }
    复制代码
  • 定义报告任务完成RPC的args和reply struct
    TaskID变量作用在CallTaskReply: TaskID 中提及
    tp的作用是用于coordinator判断该RPC是否是合法的,举例:worker-1成功请求到map-1任务,但是因为worker-1节点硬件问题处理缓慢而导致coordinator检测到该map-1任务超时,于是把map-1任务分配给了worker-2。等到某个时间点,已经完成所有map任务,coordinator进入到了reduce阶段,但此时worker-1节点才刚运行完map-1任务并报告给coordinator,coordinator检测到当前是reduce阶段,但收到报告完成的rpc是map类型,不会对其进行任何操作。
    1. type CallTaskDoneArgs struct {
    2.         TaskID int
    3.         tp     taskType
    4. }
    5. type CallTaskDoneReply struct {
    6. }
    复制代码
3.2 Coordinator

3.2.1 结构体设计
  1. type taskState int
  2. const (
  3.         spare taskState = iota
  4.         executing
  5.         finish
  6. )
  7. type task struct {
  8.         fileName string
  9.         id       int
  10.         state    taskState
  11.         start    time.Time
  12. }
复制代码
首先设计一个task struct,该结构体代表一个任务

  • filename:在map阶段,用于coordinator告知worker要读取的初始文件
  • id: 该任务的id,传给worker,作用在RPC设计中提及
  • state:任务有三个状态:空闲、执行中、已完成。若空闲则可以分配给worker;若执行中,则监视该任务是否超时
  • start:任务刚开始执行的时间
  1. type Coordinator struct {
  2.         // Your definitions here.
  3.         mu         sync.Mutex
  4.         state      taskType
  5.         tasks      []*task
  6.         mapChan    chan *task
  7.         reduceChan chan *task
  8.         nReduce    int
  9.         nFiles     int
  10.         finished   int
  11. }
复制代码
接着设计主要Coordinator结构体

  • state:当前系统的状态,map阶段(分配map任务)、reduce阶段(分配reduce任务)、全部完成done(可以结束系统运行)
  • tasks: *task的切片,维护了一组任务
  • mapChan、reduceChan:用于分发map、reduce任务的channel。map阶段,若有空闲map任务,则放至channel中,当有worker请求任务时,则可取出来。reduce阶段同理
  • finished:当前已完成任务的数量。map阶段,若finished == nFiles,则表示所有map任务完成,可以进入reduce阶段。reduce阶段同理,进入done
3.2.2 初始化

[code]func MakeCoordinator(files []string, nReduce int) *Coordinator {        c := Coordinator{}        // Your code here.        c.mapPhase(files, nReduce)        go c.watch()        c.server()        return &c}func (c *Coordinator) mapPhase(files []string, nReduce int) {        c.state = mapType                 //设置系统状态为map阶段        c.nReduce = nReduce                c.nFiles = len(files)        c.tasks = make([]*task, c.nFiles)        c.mapChan = make(chan *task, c.nFiles) // c.nFiles长度的map channel        for i := 0; i < c.nFiles; i++ {                c.tasks = &task{fileName: files, id: i}                c.mapChan  0 {                case true:                        task :=  0 {                case true:                        task :=  0,则取出一个task,调用c.setReply(task, reply),将任务的相关信息填入reply中,并把task的当前状态设为执行中,开始时间设为time.Now()。如果没有可分配的任务,则设reply.Tp = waitting,让worker等待一会再请求任务若当前系统状态为reduce阶段:同上
</ol>3.2.4 任务完成

[code]func (c *Coordinator) CallTaskDone(args *CallTaskDoneArgs, reply *CallTaskDoneReply) error {        c.mu.Lock()        defer c.mu.Unlock()        if c.state != args.Tp || c.state == done {                return nil        }        if c.tasks[args.TaskID].state != finish {                c.tasks[args.TaskID].state = finish                c.finished++                //fmt.Printf("task %v done\n", args.TaskID)                if c.state == mapType && c.finished == c.nFiles {                        c.reducePhase()                } else if c.state == reduceType && c.finished == c.nReduce {                        close(c.reduceChan)                        c.state = done                }        }        return nil}func (c *Coordinator) reducePhase() {        //fmt.Printf("reduce phase\n")        close(c.mapChan)        c.state = reduceType        c.tasks = make([]*task, c.nReduce)        c.finished = 0        c.reduceChan = make(chan *task, c.nReduce)        for i := 0; i < c.nReduce; i++ {                c.tasks = &task{id: i}                c.reduceChan  timeout {                                task.state = spare                                switch c.state {                                case mapType:                                        c.mapChan

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

河曲智叟

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表