ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

mit6.824lab1

2022-02-20 22:30:34  阅读:219  来源: 互联网

标签:map string worker reduce lab1 mutex reply mit6.824


环境

指导书,认真看 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
要求golang1.15 那就用那个版本
我开始尝试使用1.17发现gobuild不通过
在windows的goland上编代码, 在远程linux服务器(或者虚拟机)上执行。没有mac的可以尝试远程目录挂载。
windows挂载远程目录
实验是做完了,这里写一写过程,整理一下思路
在这里插入图片描述

过程

  1. 首先认真读了mapreduce的论文,因为以前本科也上过课,研一也上过,所以对于mr的过程还是算熟悉。

  2. 认真读指导书

  3. 按指导书跑一遍 sequencial-mr的例子

  4. 根据指导书和论文,需要做的事情是实现coordinator和worker的通信,(blacklive matters!)这里没用master了。coordinator里面用一些数据结构mutex或者channel控制不同worker的互斥访问,控制map任务和reduce任务的状态,map任务产生的中间文件的位置等。根据论文里面说的3种状态,Idle,inprogress,complete.这里我用了比较简单的数据结构。因为这个实验的输入文件都不是很大,不像gfs实验中需要split为16-64MB,所以直接一个map任务对应一个输入文件,因此将文件名作为了任务名,map的key。reduce任务名则根据mrcoordinator.go指定的nreduce数量决定,用数字代替。TaskStatus是枚举,三种任务状态。结构比较简单,刚开始想着要是数据结构不够用后面再加,但是好像做下来都没改过。

修改这三个文件
在这里插入图片描述

type Coordinator struct {
	mapTasks       map[string]TaskStatus
	reduceTasks    map[string]TaskStatus
	reduceFiles    map[string][]string //中间文件位置
	mapCompleted   bool 
	MapCompleteNum int //完成了多少个map
	nReduce        int 
	done bool
	mutex sync.Mutex
}

master初始化

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{
		map[string]TaskStatus{},
		map[string]TaskStatus{},
		map[string][]string{},
		false,
		0,
		nReduce,
		false,
		sync.Mutex{},
	}

	// Your code here.
	for _, fileName := range files {
		c.mapTasks[fileName] = Idle
	}
	for i := 0; i < nReduce; i++ {
		c.reduceTasks[strconv.Itoa(i)] = Idle
		c.reduceFiles[strconv.Itoa(i)] = []string{}
	}

	c.server()
	return &c
}

worker申请任务

他给的demo里面有example rpc调用的例子,在rpc里定义自己的请求体和响应体的结构,这也是rpc通信的风格。grpc开发app就是先定义protobuf消息格式。
我做的过程中定义的函数没有返回error,好像不行,需要保持和example一样。
coordinator(master)分配任务的同时,给对应的任务加一个过期时间,实验要求的是10s。
超时处理的时候,刚开始没看指导书的hints,后来发现里面建议使用ioutils.TempFile,因为测试过程中会测试当某个worker crash掉的时候,程序会怎么办,A worker超时了,分给另一个worker B,A写了一半的文件就失效了,因此应该用临时文件,最后返回的时候写入成功了才改成本应的文件名。
这里map的中间产物我命名很随意,直接文件名*n,n代表作为哪一个reduce的输入,n是由key 哈希后 取模得到的。


func (c *Coordinator) GetTask(request *ExampleArgs, reply *GetTaskReply) error {

	c.mutex.Lock()
	defer c.mutex.Unlock()

	if !c.mapCompleted { //分配map任务
		for fileName, status := range c.mapTasks {
			if status == Idle {
				c.mapTasks[fileName] = InProgress
				reply.MapName = fileName
				reply.NReduce = c.nReduce
				reply.TaskType = MapType
				go c.HandleMapTimeout(fileName)
				return nil
			}
		}
		reply.TaskType = Sleep

	} else {

		for reduceName, status := range c.reduceTasks {
			if status == Idle{
				c.reduceTasks[reduceName] = InProgress
				reply.ReduceFiles = c.reduceFiles[reduceName]
				reply.TaskType = ReduceType
				reply.ReduceName = reduceName
				go c.HandleReduceTimeout(reduceName)
				return nil
			}
		}
		reply.TaskType = Sleep
	}
	return nil

}

worker上报任务完成

map和reduce完成后同样rpc调用,报告master完成了任务,每次上报master都判断map阶段和reduce阶段是否完成,主程序中有一个判断整个程序是否完成的逻辑,done()

func (c *Coordinator) ReduceReport(req *ReduceCompleteReq,reply *ExampleReply) error{
	c.mutex.Lock()
	defer c.mutex.Unlock()
	c.reduceTasks[req.ReduceName] = Completed
	//fmt.Fprintf(os.Stderr, "####reduce完成 "+req.ReduceName+"\n")
	fmt.Fprintf(os.Stderr, "####reduce完成, "+req.ReduceName+"############### \n")

	done := true
	for _,status := range c.reduceTasks{
		if status != Completed{
			done = false
			break
		}
	}
	c.done = done
	return nil
}

差不多就这些,实现进程同步还可以用channel

遇到的两个坑就是,一个rpc调用函数需要返回error,另一个就是crash测试,如果不做临时文件处理,就可能通不过。

标签:map,string,worker,reduce,lab1,mutex,reply,mit6.824
来源: https://blog.csdn.net/qq_37053589/article/details/121979190

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有