mapreduce框架运行发生race

1. 运行环境

go版本:go1.21.4 linux/amd64
系统版本:Ubuntu 20.04.6 LTS
系统架构:x86_64

2. 问题描述?

我遇到的问题是我运行我自己实现的mapreduce框架(mit6.824的lab 1)来进行单词计数时发生race,我加了很多的锁,但是依然发生读写操作的竞争条件.
以下是报错内容:

oot@ubuntu:/home/sdr/go_project/6.824/src/main# ./mrcoordinator pg-being_ernest.txt pg-huckleberry_finn.txt pg-dorian_gray.txt pg-metamorphosis.txt pg-frankenstein.txt pg-sherlock_holmes.txt pg-grimm.txt pg-tom_sawyer.txt
==================
WARNING: DATA RACE
Write at 0x00c000027178 by goroutine 131:
  6.824/mr.(*Coordinator).DistrbuteTask()
      /home/sdr/go_project/6.824/src/mr/coordinator.go:234 +0x40e
  runtime.call32()
      /opt/go/src/runtime/asm_amd64.s:748 +0x42
  reflect.Value.Call()
      /opt/go/src/reflect/value.go:380 +0xb5
  net/rpc.(*service).call()
      /opt/go/src/net/rpc/server.go:382 +0x26c
  net/rpc.(*Server).ServeCodec.func2()
      /opt/go/src/net/rpc/server.go:479 +0x15e

Previous read at 0x00c000027178 by goroutine 7:
  6.824/mr.(*Coordinator).loopRemoveTimeOutTask()
      /home/sdr/go_project/6.824/src/mr/coordinator.go:60 +0x44
  6.824/mr.MakeCoordinator.func1()
      /home/sdr/go_project/6.824/src/mr/coordinator.go:104 +0x33

Goroutine 131 (running) created at:
  net/rpc.(*Server).ServeCodec()
      /opt/go/src/net/rpc/server.go:479 +0x64c
  net/rpc.(*Server).ServeConn()
      /opt/go/src/net/rpc/server.go:454 +0x624
  net/rpc.(*Server).ServeHTTP()
      /opt/go/src/net/rpc/server.go:709 +0x564
  net/http.(*ServeMux).ServeHTTP()
      /opt/go/src/net/http/server.go:2514 +0xbc
  net/http.serverHandler.ServeHTTP()
      /opt/go/src/net/http/server.go:2938 +0x2a1
  net/http.(*conn).serve()
      /opt/go/src/net/http/server.go:2009 +0xc24
  net/http.(*Server).Serve.func3()
      /opt/go/src/net/http/server.go:3086 +0x4f

Goroutine 7 (running) created at:
  6.824/mr.MakeCoordinator()
      /home/sdr/go_project/6.824/src/mr/coordinator.go:104 +0x1a6
  main.main()
      /home/sdr/go_project/6.824/src/main/mrcoordinator.go:26 +0xba
==================
FINISH.
Found 1 data race(s)

然后以下是上述报错中提及的相关方法或者结构体的代码实现



type Coordinator struct {
    Id                int //分发给task和work的id,通过该id认证任务是否接收
    Phase             int //记录任务处于map阶段还是reduce阶段
    MapTaskChannel    chan *Task
    ReduceTaskChannel chan *Task
    MapTaskNum        int
    ReduceTaskNum     int
    Lock              sync.Mutex
    IsDone            bool //用于判断是否结束程序
}


type Task struct {
    Taskid   int
    TaskType int
    Nreduce  int
    Files    []string
    Begin    time.Time
    Status   int
}


// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
    rpc.Register(c)
    rpc.HandleHTTP()
    //l, e := net.Listen("tcp", "localhost:1234")
    sockname := coordinatorSock()
    os.Remove(sockname)
    l, e := net.Listen("unix", sockname)
    if e != nil {
        log.Fatal("listen error:", e)
    }
    go http.Serve(l, nil)

}

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
    c.Lock.Lock()
    ret := c.IsDone
    c.Lock.Unlock()
    return ret
}

// 循环判断task是否超时
func (c *Coordinator) loopRemoveTimeOutTask() {
    for !c.Done() {
        c.Lock.Lock()
        switch c.Phase {
        case MapPhase:
            for i := 0; i < c.MapTaskNum; i++ {
                task := <-c.MapTaskChannel
                c.MapTaskChannel <- task
                task.checkTimeOutTask()
            }
        case ReducePhase:
            for i := 0; i < c.ReduceTaskNum; i++ {
                task := <-c.ReduceTaskChannel
                c.ReduceTaskChannel <- task
                task.checkTimeOutTask()
            }
        }
        c.Lock.Unlock()
        time.Sleep(3 * time.Second)
    }

}

func (task *Task) checkTimeOutTask() {
    if task.Status == Running && time.Since(task.Begin) > 10*time.Second {
        task.Status = Ready
    }
}

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{
        Id:                0, //Id初始化为零,持续递增生成不重复的任务和工作id
        Phase:             MapPhase,
        MapTaskNum:        len(files),
        ReduceTaskNum:     nReduce,
        MapTaskChannel:    make(chan *Task, len(files)),
        ReduceTaskChannel: make(chan *Task, nReduce),
        IsDone:            false,
        Lock:              sync.Mutex{},
    }
    c.generateMapTask(files)

    go c.loopRemoveTimeOutTask()
    c.server()
    return &c
}

// 生成map task,并放入map task chan queue
func (c *Coordinator) generateMapTask(files []string) {
    taskarray := make([]Task, len(files))
    c.Lock.Lock()
    for i, file := range files {
        taskarray[i] = Task{
            Taskid:   -1,
            TaskType: MapTask,
            Files:    []string{file},
            Status:   Ready,
        }
        c.MapTaskChannel <- &taskarray[i]
    }
    c.Lock.Unlock()
}

// 生成reduce task,并放入reduce task chan queue
func (c *Coordinator) generateReducTask() {
    taskarray := make([]Task, c.ReduceTaskNum)
    c.Lock.Lock()
    for i := 0; i < c.ReduceTaskNum; i++ {
        taskarray[i] = Task{
            Taskid:   -1,
            TaskType: ReduceTask,
            Files:    []string{fmt.Sprintf("mr-tmp-*-%v", i)}, //使用通配符获取所有为该reduce id的map-out-file
            Status:   Ready,
        }
        c.ReduceTaskChannel <- &taskarray[i]
    }
    c.Lock.Unlock()
}

// 检测map task是否全部完成
func (c *Coordinator) maptaskallfinish() bool {
    c.Lock.Lock()
    for i := 0; i < c.MapTaskNum; i++ {
        task := <-c.MapTaskChannel
        c.MapTaskChannel <- task
        if task.Status == Ready || task.Status == Running {
            c.Lock.Unlock()
            return false
        }
    }
    c.Lock.Unlock()
    return true
}

// 检测reduce task是否全部完成
func (c *Coordinator) reducetaskallfinish() bool {
    c.Lock.Lock()
    for i := 0; i < c.ReduceTaskNum; i++ {
        task := <-c.ReduceTaskChannel
        c.ReduceTaskChannel <- task
        if task.Status == Ready || task.Status == Running {
            c.Lock.Unlock()
            return false
        }
    }
    c.Lock.Unlock()
    return true
}

func (c *Coordinator) getphase() int {
    c.Lock.Lock()
    phase := c.Phase
    c.Lock.Unlock()
    return phase
}

// worker 通过rpc调用该函数获取任务
func (c *Coordinator) DistrbuteTask(requesttaskargs *RequestTaskArgs, requesttaskreply *RequestTaskReply) error {
    if c.getphase() == MapPhase {
        c.Lock.Lock()
        for i := 0; i < c.MapTaskNum; i++ {
            task := <-c.MapTaskChannel
            c.MapTaskChannel <- task
            if task.Status == Ready {
                t := time.Now()
                requesttaskreply.Task = Task{
                    Taskid:   c.Id,
                    TaskType: MapTask,
                    Nreduce:  c.ReduceTaskNum,
                    Files:    task.Files,
                    Status:   Running,
                }
                requesttaskreply.Done = false

                task.Taskid = c.Id
                task.Begin = t
                task.Status = Running
                c.Id++
                c.Lock.Unlock()
                return nil
            }
        }
        c.Lock.Unlock()
        if c.maptaskallfinish() {
            c.Lock.Lock()
            c.Phase = ReducePhase
            c.Lock.Unlock()
            c.generateReducTask()
        } else {
            time.Sleep(3 * time.Second)
        }
    } else {
        c.Lock.Lock()
        for i := 0; i < c.ReduceTaskNum; i++ {
            task := <-c.ReduceTaskChannel
            c.ReduceTaskChannel <- task
            if task.Status == Ready {
                t := time.Now()
                requesttaskreply.Task = Task{
                    Taskid:   c.Id,
                    TaskType: ReduceTask,
                    Nreduce:  c.ReduceTaskNum,
                    Files:    task.Files,
                    Status:   Running,
                }
                requesttaskreply.Done = false

                task.Taskid = c.Id
                task.Begin = t
                task.Status = Running
                c.Id++ //+++++++++++++++++++++++++++++++++
                c.Lock.Unlock()
                return nil
            }
        }
        c.Lock.Unlock()
        if c.reducetaskallfinish() {
            c.Lock.Lock()
            c.Phase = Finish
            c.IsDone = true
            c.Lock.Unlock()
            requesttaskreply.Done = true
            return nil
        } else {
            time.Sleep(3 * time.Second)
        }

    }
    return nil
}

// worker 完成一个map task通过rpc调用该函数返回任务结果
func (c *Coordinator) ReceivedMapTask(maptaskdoneargs *MapTaskDoneArgs, maptaskdonereply *MapTaskDoneReply) error {
    c.Lock.Lock()
    for i := 0; i < c.MapTaskNum; i++ {
        task := <-c.MapTaskChannel
        c.MapTaskChannel <- task
        task.checkTimeOutTask()
        if task.Status == Running && task.Taskid == maptaskdoneargs.Task.Taskid {
            err := receivedfiles(maptaskdoneargs.Files)
            if err != nil {
                log.Fatal("copy file error(map): ", err)
                c.Lock.Unlock()
                return nil
            }
            task.Status = Finish
            c.Lock.Unlock()
            return nil
        }
    }
    c.Lock.Unlock()
    return nil
}

// worker 完成一个reduce task通过rpc调用该函数返回任务结果
func (c *Coordinator) ReceivedReduceTask(reducetaskdoneargs *ReduceTaskDoneArgs, reducetaskdonereply *ReduceTaskDoneReply) error {
    c.Lock.Lock()
    for i := 0; i < c.ReduceTaskNum; i++ {
        task := <-c.ReduceTaskChannel
        c.ReduceTaskChannel <- task
        task.checkTimeOutTask()
        if task.Status == Running && task.Taskid == reducetaskdoneargs.Task.Taskid {
            err := receivedfiles(reducetaskdoneargs.Files)
            if err != nil {
                log.Fatal("copy file error(reduce): ", err)
                c.Lock.Unlock()
                return nil
            }
            task.Status = Finish
            c.Lock.Unlock()
            return nil
        }
    }
    c.Lock.Unlock()
    return nil
}

// 用于将/var/tmp中的临时文件copy到当前目录,表示接收任务结果
func receivedfiles(files []string) error {
    for _, filename := range files {
        srcf, e := os.Open(Tempfilepath + filename)
        if e != nil {
            return e
        }
        defer srcf.Close()
        destf, er := os.Create(filename)
        if er != nil {
            return er
        }
        defer destf.Close()
        _, err := io.Copy(destf, srcf)
        if err != nil {
            return err
        }
    }
    return nil
}


3. 您期望得到的结果?

我希望能解决发生data race问题,就是运行该程序时不发生race的报错。
//: <> (能截图就截图。)

4. 您实际得到的结果?

报错信息就在上面问题描述那里。
//: <> (有报错信息的话把堆栈信息提供出来)

讨论数量: 11

信息缺少看不出,c.generateMapTask(files) 是啥?task是什么结构?c.server()? :joy:

4个月前 评论

好的,我现在把coordinator包中的代码都贴上去,并且补充了Task结构体和Coordinator结构体。

4个月前 评论

如果还有什么需要的信息请告诉我,这个项目中worker包中的代码我还没有贴,但是我初步判断这个race应该是不涉及另一个包的。

4个月前 评论

看不出 :sweat_smile:。你看下这两行吧。

file

4个月前 评论

60行如下图所示 file 然后234行是我上面新添加'//+++++++++++++++++++++++++++++++++'标记那行。

4个月前 评论

然后60行前面的一个func是Done()

4个月前 评论

这看不出啥,建议把 map 和 reduce 的 task 分成 map or task,idele task, executingTask 和 finishedTask,每个阶段分开,减少出现 data race 的情况,另外就是有时候适当用 time.Sleep()

4个月前 评论

好的,感谢你的建议。

4个月前 评论
yourself

嵌套条件分支写unlock。。。 代码看起来不要太酸爽

3个月前 评论
Jacky2021 3个月前
yourself (作者) 3个月前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!