mapreduce框架运行发生race
1. 运行环境
go版本:go1.21.4 linux/amd64
系统版本:Ubuntu 20.04.6 LTS
系统架构:x86_64
2. 问题描述?
我遇到的问题是我运行我自己实现的mapreduce框架(mit6.824的lab 1)来进行单词计数时发生race,我加了很多的锁,但是依然发生读写操作的竞争条件.
以下是报错内容:
oot@ubuntu:/home/sdr/go_project/6.824/src/main# ./mrcoordinator pg-being_ernest.txt pg-huckleberry_finn.txt pg-dorian_gray.txt pg-metamorphosis.txt pg-frankenstein.txt pg-sherlock_holmes.txt pg-grimm.txt pg-tom_sawyer.txt
==================
WARNING: DATA RACE
Write at 0x00c000027178 by goroutine 131:
6.824/mr.(*Coordinator).DistrbuteTask()
/home/sdr/go_project/6.824/src/mr/coordinator.go:234 +0x40e
runtime.call32()
/opt/go/src/runtime/asm_amd64.s:748 +0x42
reflect.Value.Call()
/opt/go/src/reflect/value.go:380 +0xb5
net/rpc.(*service).call()
/opt/go/src/net/rpc/server.go:382 +0x26c
net/rpc.(*Server).ServeCodec.func2()
/opt/go/src/net/rpc/server.go:479 +0x15e
Previous read at 0x00c000027178 by goroutine 7:
6.824/mr.(*Coordinator).loopRemoveTimeOutTask()
/home/sdr/go_project/6.824/src/mr/coordinator.go:60 +0x44
6.824/mr.MakeCoordinator.func1()
/home/sdr/go_project/6.824/src/mr/coordinator.go:104 +0x33
Goroutine 131 (running) created at:
net/rpc.(*Server).ServeCodec()
/opt/go/src/net/rpc/server.go:479 +0x64c
net/rpc.(*Server).ServeConn()
/opt/go/src/net/rpc/server.go:454 +0x624
net/rpc.(*Server).ServeHTTP()
/opt/go/src/net/rpc/server.go:709 +0x564
net/http.(*ServeMux).ServeHTTP()
/opt/go/src/net/http/server.go:2514 +0xbc
net/http.serverHandler.ServeHTTP()
/opt/go/src/net/http/server.go:2938 +0x2a1
net/http.(*conn).serve()
/opt/go/src/net/http/server.go:2009 +0xc24
net/http.(*Server).Serve.func3()
/opt/go/src/net/http/server.go:3086 +0x4f
Goroutine 7 (running) created at:
6.824/mr.MakeCoordinator()
/home/sdr/go_project/6.824/src/mr/coordinator.go:104 +0x1a6
main.main()
/home/sdr/go_project/6.824/src/main/mrcoordinator.go:26 +0xba
==================
FINISH.
Found 1 data race(s)
然后以下是上述报错中提及的相关方法或者结构体的代码实现
type Coordinator struct {
Id int //分发给task和work的id,通过该id认证任务是否接收
Phase int //记录任务处于map阶段还是reduce阶段
MapTaskChannel chan *Task
ReduceTaskChannel chan *Task
MapTaskNum int
ReduceTaskNum int
Lock sync.Mutex
IsDone bool //用于判断是否结束程序
}
type Task struct {
Taskid int
TaskType int
Nreduce int
Files []string
Begin time.Time
Status int
}
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", "localhost:1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
c.Lock.Lock()
ret := c.IsDone
c.Lock.Unlock()
return ret
}
// 循环判断task是否超时
func (c *Coordinator) loopRemoveTimeOutTask() {
for !c.Done() {
c.Lock.Lock()
switch c.Phase {
case MapPhase:
for i := 0; i < c.MapTaskNum; i++ {
task := <-c.MapTaskChannel
c.MapTaskChannel <- task
task.checkTimeOutTask()
}
case ReducePhase:
for i := 0; i < c.ReduceTaskNum; i++ {
task := <-c.ReduceTaskChannel
c.ReduceTaskChannel <- task
task.checkTimeOutTask()
}
}
c.Lock.Unlock()
time.Sleep(3 * time.Second)
}
}
func (task *Task) checkTimeOutTask() {
if task.Status == Running && time.Since(task.Begin) > 10*time.Second {
task.Status = Ready
}
}
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
Id: 0, //Id初始化为零,持续递增生成不重复的任务和工作id
Phase: MapPhase,
MapTaskNum: len(files),
ReduceTaskNum: nReduce,
MapTaskChannel: make(chan *Task, len(files)),
ReduceTaskChannel: make(chan *Task, nReduce),
IsDone: false,
Lock: sync.Mutex{},
}
c.generateMapTask(files)
go c.loopRemoveTimeOutTask()
c.server()
return &c
}
// 生成map task,并放入map task chan queue
func (c *Coordinator) generateMapTask(files []string) {
taskarray := make([]Task, len(files))
c.Lock.Lock()
for i, file := range files {
taskarray[i] = Task{
Taskid: -1,
TaskType: MapTask,
Files: []string{file},
Status: Ready,
}
c.MapTaskChannel <- &taskarray[i]
}
c.Lock.Unlock()
}
// 生成reduce task,并放入reduce task chan queue
func (c *Coordinator) generateReducTask() {
taskarray := make([]Task, c.ReduceTaskNum)
c.Lock.Lock()
for i := 0; i < c.ReduceTaskNum; i++ {
taskarray[i] = Task{
Taskid: -1,
TaskType: ReduceTask,
Files: []string{fmt.Sprintf("mr-tmp-*-%v", i)}, //使用通配符获取所有为该reduce id的map-out-file
Status: Ready,
}
c.ReduceTaskChannel <- &taskarray[i]
}
c.Lock.Unlock()
}
// 检测map task是否全部完成
func (c *Coordinator) maptaskallfinish() bool {
c.Lock.Lock()
for i := 0; i < c.MapTaskNum; i++ {
task := <-c.MapTaskChannel
c.MapTaskChannel <- task
if task.Status == Ready || task.Status == Running {
c.Lock.Unlock()
return false
}
}
c.Lock.Unlock()
return true
}
// 检测reduce task是否全部完成
func (c *Coordinator) reducetaskallfinish() bool {
c.Lock.Lock()
for i := 0; i < c.ReduceTaskNum; i++ {
task := <-c.ReduceTaskChannel
c.ReduceTaskChannel <- task
if task.Status == Ready || task.Status == Running {
c.Lock.Unlock()
return false
}
}
c.Lock.Unlock()
return true
}
func (c *Coordinator) getphase() int {
c.Lock.Lock()
phase := c.Phase
c.Lock.Unlock()
return phase
}
// worker 通过rpc调用该函数获取任务
func (c *Coordinator) DistrbuteTask(requesttaskargs *RequestTaskArgs, requesttaskreply *RequestTaskReply) error {
if c.getphase() == MapPhase {
c.Lock.Lock()
for i := 0; i < c.MapTaskNum; i++ {
task := <-c.MapTaskChannel
c.MapTaskChannel <- task
if task.Status == Ready {
t := time.Now()
requesttaskreply.Task = Task{
Taskid: c.Id,
TaskType: MapTask,
Nreduce: c.ReduceTaskNum,
Files: task.Files,
Status: Running,
}
requesttaskreply.Done = false
task.Taskid = c.Id
task.Begin = t
task.Status = Running
c.Id++
c.Lock.Unlock()
return nil
}
}
c.Lock.Unlock()
if c.maptaskallfinish() {
c.Lock.Lock()
c.Phase = ReducePhase
c.Lock.Unlock()
c.generateReducTask()
} else {
time.Sleep(3 * time.Second)
}
} else {
c.Lock.Lock()
for i := 0; i < c.ReduceTaskNum; i++ {
task := <-c.ReduceTaskChannel
c.ReduceTaskChannel <- task
if task.Status == Ready {
t := time.Now()
requesttaskreply.Task = Task{
Taskid: c.Id,
TaskType: ReduceTask,
Nreduce: c.ReduceTaskNum,
Files: task.Files,
Status: Running,
}
requesttaskreply.Done = false
task.Taskid = c.Id
task.Begin = t
task.Status = Running
c.Id++ //+++++++++++++++++++++++++++++++++
c.Lock.Unlock()
return nil
}
}
c.Lock.Unlock()
if c.reducetaskallfinish() {
c.Lock.Lock()
c.Phase = Finish
c.IsDone = true
c.Lock.Unlock()
requesttaskreply.Done = true
return nil
} else {
time.Sleep(3 * time.Second)
}
}
return nil
}
// worker 完成一个map task通过rpc调用该函数返回任务结果
func (c *Coordinator) ReceivedMapTask(maptaskdoneargs *MapTaskDoneArgs, maptaskdonereply *MapTaskDoneReply) error {
c.Lock.Lock()
for i := 0; i < c.MapTaskNum; i++ {
task := <-c.MapTaskChannel
c.MapTaskChannel <- task
task.checkTimeOutTask()
if task.Status == Running && task.Taskid == maptaskdoneargs.Task.Taskid {
err := receivedfiles(maptaskdoneargs.Files)
if err != nil {
log.Fatal("copy file error(map): ", err)
c.Lock.Unlock()
return nil
}
task.Status = Finish
c.Lock.Unlock()
return nil
}
}
c.Lock.Unlock()
return nil
}
// worker 完成一个reduce task通过rpc调用该函数返回任务结果
func (c *Coordinator) ReceivedReduceTask(reducetaskdoneargs *ReduceTaskDoneArgs, reducetaskdonereply *ReduceTaskDoneReply) error {
c.Lock.Lock()
for i := 0; i < c.ReduceTaskNum; i++ {
task := <-c.ReduceTaskChannel
c.ReduceTaskChannel <- task
task.checkTimeOutTask()
if task.Status == Running && task.Taskid == reducetaskdoneargs.Task.Taskid {
err := receivedfiles(reducetaskdoneargs.Files)
if err != nil {
log.Fatal("copy file error(reduce): ", err)
c.Lock.Unlock()
return nil
}
task.Status = Finish
c.Lock.Unlock()
return nil
}
}
c.Lock.Unlock()
return nil
}
// 用于将/var/tmp中的临时文件copy到当前目录,表示接收任务结果
func receivedfiles(files []string) error {
for _, filename := range files {
srcf, e := os.Open(Tempfilepath + filename)
if e != nil {
return e
}
defer srcf.Close()
destf, er := os.Create(filename)
if er != nil {
return er
}
defer destf.Close()
_, err := io.Copy(destf, srcf)
if err != nil {
return err
}
}
return nil
}
3. 您期望得到的结果?
我希望能解决发生data race问题,就是运行该程序时不发生race的报错。
//: <> (能截图就截图。)
4. 您实际得到的结果?
报错信息就在上面问题描述那里。
//: <> (有报错信息的话把堆栈信息提供出来)
信息缺少看不出,c.generateMapTask(files) 是啥?task是什么结构?c.server()? :joy:
好的,我现在把coordinator包中的代码都贴上去,并且补充了Task结构体和Coordinator结构体。
如果还有什么需要的信息请告诉我,这个项目中worker包中的代码我还没有贴,但是我初步判断这个race应该是不涉及另一个包的。
看不出 :sweat_smile:。你看下这两行吧。
60行如下图所示
然后234行是我上面新添加'//+++++++++++++++++++++++++++++++++'标记那行。
然后60行前面的一个func是Done()
这看不出啥,建议把 map 和 reduce 的 task 分成 map or task,idele task, executingTask 和 finishedTask,每个阶段分开,减少出现 data race 的情况,另外就是有时候适当用 time.Sleep()
好的,感谢你的建议。
嵌套条件分支写unlock。。。 代码看起来不要太酸爽