ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

6.824 lab1 MapReduce

2022-02-20 21:34:55  阅读:207  来源: 互联网

标签:map 6.824 err reduce worker MapReduce lab1 kv reply


6.824 Lab-1 MapReduce

1.实验内容

1.1内容概述

将经典的Word Counter任务使用MapReduce编程范式去实现,任务整体流程如下(假设两个Map节点和两个Reduce节点):
image-20220220165329877

每个Map Worker负责一个输入文件的Map处理,每个Map任务输出N份文件(N是Reduce Worker数目),这N份文件会送到N个Reduce Worker处理。等待所有Map任务完成后,Reduce工作才能开始(此时所有输入文件才准备好),每个Reduce Worker输出一份reduce结果。

1.2lab相关代码概述

与本实验相关的主要有三个包,分别是main、mrapps、mr包。main包调用mrapps包和mr包运行整个流程,mrapps包是运行和测试时使用的工具函数包,这两个包在实验过程中都不需要改动,自己写的代码都在mr包中,其中包含三个文件coordinator.go、rpc.go和worker.go,作用如下

mr
├── coordinator.go \\Master
├── rpc.go		   \\处理通信
└── worker.go	   \\worker,包含map和reduce

2.实验步骤

2.1定义通信内容(rpc.go)

2.1.1分配任务时的请求与响应

worker请求任务时,不区分map和reduce,让coordinator根据任务完成情况来决定分配任务类型,这里有个边界情况就是所有的任务都在运行中,这个状态既得不到任务又不能直接退出,只能进行下一次任务。

// 节点请求任务
type ReqArgs struct {
	ReqNumber int8 //占用一个字节表示请求 为1表示申请任务
}
// Master回应任务内容
type ReqReply struct {
	TypeName string   // map or reduce or allinprogress
	Idx      int      //worker idx
	Content  []string //file names for work content
	NReduce  int      //reduce worker number
}

2.1.2完成任务时worker汇报任务结果给Master

汇报完成情况时,需要说明任务类型(TypeName), 任务结果(Ret), 完成的任务编号(idx)

// 报告任务完成情况
type FinishReq struct {
	TypeName string //map or reduce
	Ret      []string //output files(map or reduce)
	Idx      int //worker idx (map or reduce)
}

//Master 回应worker
type FinishReply struct {
	Done bool //for reply
}

2.2实现Worker(Worker.go)

2.2.1Map worker

Map worker任务包括三步,第一步读取输入文件调用mapf生成key value对,第二步处理kv对,排序之后合并相同条目到同一行,第三步将结果写入输出文件。

1.读取文件内容生成Key Value对
intermediate := []KeyValue{}
filename := reply1.Content[0]
NReduce := reply1.NReduce

//打开文件
file, err := os.Open(filename)
if err != nil {
	log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
	log.Fatalf("cannot read %v", filename)
}

file.Close()

//生成KV对
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
2.处理kv对生成中间结果
sort.Sort(ByKey(intermediate))
i := 0
reduceInput := make([][]ReduceKv, NReduce)
for i < len(intermediate) {
	j := i + 1
    for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
        j++
    }
    values := []string{}
    for k := i; k < j; k++ {
        values = append(values, intermediate[k].Value)
    }

    key := intermediate[i].Key

    //根据key分配到不同的reduce任务中
    idx := ihash(key) % NReduce
    reduceInput[idx] = append(reduceInput[idx], ReduceKv{Key: key, Value: values})

    i = j
}
3.写入结果

这个过程分为两步,首先写入临时文件,然后检查文件是否已经存在,如果不存在,将临时文件改名(原子操作),这样做的好处是防止一个任务被分配给不同worker时,多个worker同时写一个文件。

  1. 写入临时文件

    tempFiles := make([]*os.File, NReduce)
    //临时文件命名
    for i := range tempFiles {
    	tempFiles[i], err = ioutil.TempFile(".", "out*")
    	if err != nil {
    		log.Fatal("creat tempfile fail")
    	}
    }
    
    //对象以json格式写入临时文件
    for i := range reduceInput {
    	enc := json.NewEncoder(tempFiles[i])
    	for _, kv := range reduceInput[i] {
    		err := enc.Encode(&kv)
    		if err != nil {
    			log.Fatalf("cannot write json %v", i)
    		}
    	}
    }
    
  2. 重命名(原子操作)

//输入文件命名
for i := range outNames {
	outNames[i] = "mr-map-out-" + strconv.Itoa(reply1.Idx) + "-" + strconv.Itoa(i)
}
//rename
for i := range tempFiles {
	_, err := os.Stat(outNames[i])
	if os.IsNotExist(err) {
		os.Rename(tempFiles[i].Name(), outNames[i])
		if i == len(tempFiles)-1 {//complete work, call finish info
			args := FinishReq{TypeName: "map", Ret: outNames, Idx: reply1.Idx}
			reply := FinishReply{}
			call("Coordinator.HandFinishInfo", &args, &reply)
		}
	} else {
		os.Remove(tempFiles[i].Name()) //remove tempfile
		break
	}
}

2.2.2Reduce Worker

reduce 操作主要是读取map处理得到的文件,然后处理写入输出文件,写文件和map worker类似,先写入临时文件,完成之后再重命名

使用一个map来记录读取到的kv对,kvaMap := make(map[string]*ReduceKv)

1.读取中间文件(json格式)
for _, filename := range inputFileNames {
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}

	dec := json.NewDecoder(file)

	for {
		var kv ReduceKv
		if err := dec.Decode(&kv); err != nil {
			break
		}
		if kvaMap[kv.Key] == nil {//map中没有记录,新建
			kvaMap[kv.Key] = &kv
		} else {//map中已经记录,追加
			kvaMap[kv.Key].Value = append(kvaMap[kv.Key].Value, kv.Value...)
		}
	}
}
2.写入临时文件
// 写入临时文件
for _, kv := range kvaMap {
   output := reducef(kv.Key, kv.Value)
   fmt.Fprintf(oTmpFile, "%v %v\n", kv.Key, output)
}
3.重命名,完成后汇报给Master
// 重命名临时文件
outName := "mr-out-" + strconv.Itoa(idx)
retName := []string{}
_, err := os.Stat(outName)
if os.IsNotExist(err) {
	os.Rename(oTmpFile.Name(), outName)
    //通知Master
	args := FinishReq{TypeName: "reduce", Ret: append(retName, outName), Idx: idx}
	reply := FinishReply{}
	call("Coordinator.HandFinishInfo", &args, &reply)
} else {
	os.Remove(oTmpFile.Name())
}

2.3实现Master (Coordinate.go)

Master负责记录任务的状态和分配任务,定义了两个数据结构如下:

type Task struct {
	inputFileName []string   //输入文件
	status        TaskStatus //任务状态
}

type Coordinator struct {
	MapTask    []Task
	ReduceTask []Task
	NReduce    int//记录reduce worker数目 关系到map输出的文件数
}

定义两个锁和两个全局变量

var coordinateLock sync.RWMutex //用于控制Coordinate结构内变量的访问
var lockBool sync.RWMutex       //控制mapDone 和 reduceDone

//标识任务完成状态
var mapDone bool = false
var reduceDone bool = false

定义两个函数 IsWorkDone AssignTask

//用于判断某一类任务是否完成
func IsWorkDone(tasks []Task) bool {
	coordinateLock.RLock()
	defer coordinateLock.RUnlock()
	for i := range tasks {
		if tasks[i].status != Completed {
			return false
		}
	}
	return true
}

//读取任务状态并分配任务,返回值中int如果为正数则是worker id, 为-1表示该类任务完成,为-2表示所有任务都在执行
func AssignTask(tasks []Task) (*Task, int) {
	coordinateLock.RLock()
	defer coordinateLock.RUnlock()
	for i := range tasks {
		if tasks[i].status == Idle {
			return &tasks[i], i
		}
	}
	if IsWorkDone(tasks) {
		return nil, -1 //work done
	} else {
		return nil, -2 //all in progress wait
	}
}

定义Coordinate中处理两类请求的函数,需要在任务分配后的10秒,检查任务是否完成,如果没有完成,需要重置任务状态到空闲,便于分配给另外一个节点,这里开了一个协程来完成这个事情,只要主函数没有结束,协程不会提前结束,也就是说调用协程的函数(不是main)结束了,协程还可以运行

处理分配任务的请求:

func (c *Coordinator) HandWorkerReq(args *ReqArgs, reply *ReqReply) error {
	if args.ReqNumber == 1 {
		lockBool.RLock()
		if !mapDone {
			if mapTask, mapStatus := AssignTask(c.MapTask); mapStatus >= 0 {
				// lock.Lock()

				reply.TypeName = "map"
				reply.Content = mapTask.inputFileName
				reply.Idx = mapStatus
				reply.NReduce = c.NReduce

				coordinateLock.Lock()
				mapTask.status = InProgress
				coordinateLock.Unlock()
				// lock.Unlock()

				go CheckStatus(mapTask)

			} else if mapStatus == -1 {
				lockBool.RUnlock()
				lockBool.Lock()
				mapDone = true
				lockBool.Unlock()
				lockBool.RLock()
				// lock.Unlock()
			} else {
				// lock.Lock()
				reply.TypeName = "allinprogress"
				// lock.Unlock()
			}
		} else if !reduceDone {
			if redTask, redStatus:=AssignTask(c.ReduceTask); redStatus >= 0 {
				reply.TypeName = "reduce"
				reply.Idx = redStatus
				reply.Content = redTask.inputFileName
				reply.NReduce = c.NReduce

				coordinateLock.Lock()
				redTask.status = InProgress
				coordinateLock.Unlock()

				go CheckStatus(redTask)
			} else if redStatus == -1 {
				lockBool.RUnlock()
				lockBool.Lock()
				reduceDone = true
				lockBool.Unlock()
				lockBool.RLock()
			} else {
				// lock.Lock()
				reply.TypeName = "allinprogress"
				// lock.Unlock()
			}
		} else {
			// lock.Lock()
			reply.TypeName = "finish"
			// lock.Unlock()
		}
		lockBool.RUnlock()
	}
	return nil
}

//检查任务状态
func CheckStatus(t *Task) {
	time.Sleep(10 * time.Second)

	coordinateLock.Lock()
	defer coordinateLock.Unlock()

	if t.status != Completed {
		t.status = Idle
	}
}

处理任务完成消息:

func (c *Coordinator) HandFinishInfo(args *FinishReq, reply *FinishReply) error {
	idx := args.Idx
	if args.TypeName == "map" {
		coordinateLock.Lock()

		if c.MapTask[idx].status != Completed {
			for i := range c.ReduceTask {
				c.ReduceTask[i].inputFileName = append(c.ReduceTask[i].inputFileName, args.Ret[i])
			}
			c.MapTask[idx].status = Completed
		}

		coordinateLock.Unlock()
	}

	if args.TypeName == "reduce" {

		coordinateLock.Lock()

		if c.ReduceTask[idx].status != Completed {
			c.ReduceTask[idx].status = Completed
		}

		coordinateLock.Unlock()
	}
	return nil
}

3实验总结

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JTrtDnoe-1645363330895)(C:\Users\sonwi\AppData\Roaming\Typora\typora-user-images\image-20220220211427160.png)]

踩坑! 代码可以通过所有的测试,在debug的过程中,有一个版本一直过不了reduce的并行测试,后面看了测试代码才知道,判断并行的方式是在同1秒内有没有多个节点在新建文件,由于之前设置的worker会休息1秒再发送请求(指导书建议这样做),正是请求的间隙,造成了reduce在测试中无法并行,后面不等待直接发送下一个请求就可以通过测试了。

心得! lab1的指导书真的超级详细,基本上有问题了指导书上都可以找到建议,只要认真做,根本不用参考别人的代码,就是debug的时候有点困难,基本靠打印来判断问题。

标签:map,6.824,err,reduce,worker,MapReduce,lab1,kv,reply
来源: https://blog.csdn.net/qq_41084473/article/details/123036273

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有