ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

MIT 6.824 Lab 1: MapReduce

2021-10-13 14:06:12  阅读:346  来源: 互联网

标签:6.824 return string err int Lab nil reply MIT


笑死,这个实验像是在做需求,不过没钱拿QAQ

文章目录

需要注意的

  1. 中间文件名mr-X-Y,我的实现是每次worker发送map rpc请求的时候发送一个文件名作为X
  2. Done()退出,多打日志看看中间有没有data race,我的实现逻辑是当mapArray(map任务数组) mapMap(map任务crash-recovery标记) reduceArray(reduce任务数组) reduceMap(reduce任务crash-recovery标记) 长度全为0时退出
  3. 排序,我的实现是reduce读取根据nReduce 分组的文件,这个文件用map保存在Coordinator struct
  4. RPC-Server 没有在Client注册回调接口,所以crash的时候直接把任务放回任务数组就好
  5. 因为Golang不太熟,所以踩了几个坑:RPC的结构名需要首字母大写以便正常序列化/反序列化,GoLand参数有时候会出问题,直接命令行启动最好;
  6. data race我通过多加了几个mutex解决,可以优化一下减小粒度
  7. bash脚本中设置LC_COLLATE=C保证sort命令按照大写优先排序

代码

coordinator

package mr

import (
	"log"
	"sync"
	"time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"

type Coordinator struct {
	// Your definitions here.
	midFileLock sync.Mutex
	nReduce       int
	midFile		map[int][]string

	mapTaskLock sync.Mutex
	mapTaskMapLock sync.Mutex
	mapTaskNum int
	mapTask       []string
	mapTaskMap map[int]bool

	reduceTaskLock sync.Mutex
	reduceTaskMapLock sync.Mutex
	reduceTaskNum int
	reduceTask []int
	reduceTaskMap map[int]bool
}

// Example Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}
func (c *Coordinator) WorkerStatus(args *WorkerStatusArgs,reply *WorkerStatusReply) error{
	c.mapTaskLock.Lock()
	c.mapTaskMapLock.Lock()
	c.reduceTaskLock.Lock()
	c.reduceTaskMapLock.Lock()
	defer c.reduceTaskMapLock.Unlock()
	defer c.reduceTaskLock.Unlock()
	defer c.mapTaskMapLock.Unlock()
	defer c.mapTaskLock.Unlock()
	if len(c.mapTask)>0{
		reply.Status = 0
	}else if len(c.mapTaskMap)==0{
		//begin reduce
		if len(c.reduceTask)==0{
			if len(c.reduceTaskMap)==0{
				reply.Status = 3
			}
			reply.Status = 2
		}else {
			reply.Status = 1
		}
	}else{
		//wait for all map task over
		reply.Status = 2
	}
	log.Printf("get worker status,status is %d,len maptask %d,len maptaskmap %d,len reducetask %d len reducetask map %d",
		reply.Status,len(c.mapTask),len(c.mapTaskMap),len(c.reduceTask),len(c.reduceTaskMap))
	return nil
}
func (c *Coordinator) MapRequest(args *MapCallArgs, reply *MapCallReply) error {
	c.mapTaskLock.Lock()
	var file string
	if len(c.mapTask)>0{
		file = c.mapTask[0]
		c.mapTask = c.mapTask[1:len(c.mapTask)]
	}
	c.mapTaskLock.Unlock()
	if file!="" {
		c.mapTaskMapLock.Lock()
		defer c.mapTaskMapLock.Unlock()
		c.mapTaskNum++
		reply.MapTaskNum = c.mapTaskNum
		c.mapTaskMap[c.mapTaskNum] = true
		reply.Value = file
		reply.NReduce = c.nReduce

		reply.Err = ""
		log.Printf("%d task be sent,filename is %s", reply.MapTaskNum,reply.Value)
	}else{
		reply.Value = ""
		reply.Err = "Empty queue"
		return nil
	}
	go func(){
		time.Sleep(10*time.Second)
		c.mapTaskMapLock.Lock()
		c.mapTaskLock.Lock()
		defer c.mapTaskLock.Unlock()
		defer c.mapTaskMapLock.Unlock()
		if _,ok :=c.mapTaskMap[reply.MapTaskNum];ok{
			log.Printf("task %d fail,file %s come back",reply.MapTaskNum, file)
			delete(c.mapTaskMap,reply.MapTaskNum)
			c.mapTask = append(c.mapTask, file)
		}
	}()
	return nil
}
func (c *Coordinator) MapOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {
	log.Printf("map over request be called, TaskId %d ",args.TaskId)
	log.Printf("TaskId value: %d",c.mapTaskMap[args.TaskId])
	c.mapTaskMapLock.Lock()
	c.midFileLock.Lock()
	defer c.mapTaskMapLock.Unlock()
	defer c.midFileLock.Unlock()
	for i:=0;i<c.nReduce;i++{
		log.Printf("will register %d files",len(args.RegisterFile[i]))
		c.midFile[i] = append(c.midFile[i],args.RegisterFile[i]...)
	}
	delete(c.mapTaskMap,args.TaskId)
	return nil
}

func (c *Coordinator) ReduceRequest(args *ReduceCallArgs, reply *ReduceCallReply) error {
	c.reduceTaskLock.Lock()
	taskId := -1
	if len(c.reduceTask)>0{
		taskId = c.reduceTask[0]
		log.Printf("will send reduce task,task id is %d",taskId)
		c.reduceTask = c.reduceTask[1:len(c.reduceTask)]
	}else{
		log.Printf("reduceTask is empty")
		reply.Err = "Empty queue"
		return nil
	}
	c.reduceTaskLock.Unlock()
	var _ int
	if taskId!=-1 {
		c.reduceTaskMapLock.Lock()
		defer c.reduceTaskMapLock.Unlock()
		c.midFileLock.Lock()
		defer c.midFileLock.Unlock()
		c.reduceTaskNum++
		_ = c.reduceTaskNum
		c.reduceTaskMap[taskId] = true
		reply.TaskId = taskId
		reply.TaskName = c.midFile[taskId]
		log.Printf("will dispose %d file,task id is %d",len(c.midFile[taskId]),taskId)
		reply.Err = ""
	}else{
		reply.Err = "Empty queue"
		return nil
	}
	//goes normally,begin routine to monitor this task
	go func(){
		time.Sleep(10*time.Second)
		c.reduceTaskLock.Lock()
		defer c.reduceTaskLock.Unlock()
		c.reduceTaskMapLock.Lock()
		defer c.reduceTaskMapLock.Unlock()
		if _,ok :=c.reduceTaskMap[taskId];ok{
			log.Printf("task %d fail",reply.TaskId)
			delete(c.reduceTaskMap,taskId)
			c.reduceTask = append(c.reduceTask, taskId)
		}
	}()
	return nil
}
func (c *Coordinator) ReduceOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {
	c.reduceTaskMapLock.Lock()
	defer c.reduceTaskMapLock.Unlock()
	delete(c.reduceTaskMap,args.TaskId)
	log.Printf("task %d delete",args.TaskId)


	return nil
}

//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
	rpc.Register(c)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := coordinatorSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

// Done
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
	c.mapTaskLock.Lock()
	c.mapTaskMapLock.Lock()
	c.reduceTaskLock.Lock()
	c.reduceTaskMapLock.Lock()
	defer c.reduceTaskMapLock.Unlock()
	defer c.reduceTaskLock.Unlock()
	defer c.mapTaskMapLock.Unlock()
	defer c.mapTaskLock.Unlock()
	ret := false

	// Your code here.
	ret = len(c.mapTask)==0&&(len(c.mapTaskMap)==0)&&len(c.reduceTask)==0&&len(c.reduceTaskMap)==0

	return ret
}

// MakeCoordinator
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// NReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.
	c.mapTask = files
	c.nReduce = nReduce
	c.reduceTaskNum = 0
	c.mapTaskNum = 0
	for i := 0;i<nReduce;i++{
		c.reduceTask = append(c.reduceTask,i)
	}
	c.midFile=make(map[int][]string,nReduce)
	c.mapTaskMap=make(map[int] bool,1000)
	c.reduceTaskMap=make(map[int] bool,1000)
	c.server()

	return &c
}

worker

package mr

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"os"
	"sort"
	"strconv"
	"time"
)
import "log"
import "net/rpc"
import "hash/fnv"

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// KeyValue
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
	Key   string
	Value string
}

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
	h := fnv.New32a()
	_, err := h.Write([]byte(key))
	if err != nil {
		return 0
	}
	return int(h.Sum32() & 0x7fffffff)
}
func GetMidFileName(x int,y int) string{
	s := "mr-"
	s+=strconv.Itoa(x)
	s+="-"
	s+=strconv.Itoa(y)
	return s
}

// Worker
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.
	for{
		args := WorkerStatusArgs{}
		reply := WorkerStatusReply{}
		ret := call("Coordinator.WorkerStatus",&args,&reply)
		if !ret{
			break
		}
		switch reply.Status {
		case 0:
			ExecuteMapTask(mapf)
		case 1:
			ExecuteReduceTask(reducef)
		case 2:
			time.Sleep(2*time.Second)
		case 3:
			log.Printf("all task over ,worker will quit")
			break
		}
		log.Printf("task status: %d",reply.Status)
	}

	// uncomment to send the Example RPC to the coordinator.
	// CallExample()

}

// ExecuteMapTask
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func ExecuteMapTask(mapf func(string, string) []KeyValue){
	args := MapCallArgs{}
	reply := MapCallReply{}
	var registerFile map[int][]string
	registerFile = make(map[int][]string,reply.NReduce)
	call("Coordinator.MapRequest",&args,&reply)
	file, err := os.Open(reply.Value)
	if err != nil {
		log.Fatalf("cannot open %v", reply.Value)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", reply.Value)
	}
	err = file.Close()
	if err != nil {
		return
	}

	//wait for goroutines to complete
	ch := make(chan struct{})
	kva := mapf(reply.Value, string(content))
	//Temporary store of key-value pairs
	var midResult map[int][]KeyValue
	midResult = make(map[int][]KeyValue,10+5)

	for _,iter := range kva{
		iter := iter
		go func() {
			taskYNum := ihash(iter.Key)%reply.NReduce
			midResult[taskYNum] = append(midResult[taskYNum],iter)
			ch <- struct{}{}
		}()
		<-ch
	}
	for taskYNum:=0;taskYNum<reply.NReduce;taskYNum++{
		//sort.Sort(ByKey(midResult[taskYNum]))
		filename := GetMidFileName(reply.MapTaskNum,taskYNum)

		err := os.Remove(filename)
		file, err2 := ioutil.TempFile(".", "tmp")
		if err2 != nil {
			fmt.Println("文件创建失败")
			return
		}
		defer func(name string) {
			err := os.Remove(name)
			if err != nil {
				return
			}
		}(file.Name())

		enc := json.NewEncoder(file)
		for _,iter := range midResult[taskYNum]{
			err = enc.Encode(&iter)
		}
		err = os.Rename(file.Name(), filename)
		if err != nil {
			return
		}
		log.Printf("file write ok,will append registerfile subscript is %d,filename %s",taskYNum,filename)
		registerFile[taskYNum] = append(registerFile[taskYNum],filename)


	}
	mapOverRequestArgs := OverRequestArgs{}
	mapOverRequestReply := OverRequestReply{}
	mapOverRequestArgs.RegisterFile = registerFile
	mapOverRequestArgs.TaskId = reply.MapTaskNum
	log.Printf("task ok,begin return %s, register file size %d", reply.Value,len(registerFile))
	call("Coordinator.MapOverRequest",&mapOverRequestArgs,&mapOverRequestReply)

}

func ExecuteReduceTask(reducef func(string, []string) string){
	args := ReduceCallArgs{}
	reply := ReduceCallReply{}
	call("Coordinator.ReduceRequest",&args,&reply)
	log.Printf("will dispose task %d",reply.TaskId)
	tmpFile, err2 := ioutil.TempFile(".", "tmp")
	if err2 != nil {
		log.Println("文件创建失败")
		return
	}
	defer func(name string) {
		err := os.Remove(name)
		if err != nil {
			return
		}
	}(tmpFile.Name())
	log.Printf("Reduce task file size: %d",len(reply.TaskName))
	var kva []KeyValue

	//range all files get slice
	for _, fileName := range reply.TaskName {
		log.Printf("range taskname is %s", fileName)
		file, err := os.Open(fileName)
		if err != nil {
			log.Fatalf("cannot open %v", file)
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
	}
	//sort slices
	sort.Sort(ByKey(kva))

	//get result and print to final file
	var nowStr string
	var values []string
	for subscript,kv :=range kva{
		if subscript==len(kva)-1 {
			if nowStr!=""{
				output:= reducef(kv.Key,values)
				_, err := fmt.Fprintf(tmpFile, "%v %v\n", nowStr, output)
				if err != nil {
					log.Printf("output to tmp file fail")
					return
				}
			}
			break
		}
		if kv.Key== nowStr {
			values = append(values, kv.Value)
		}else{
			if nowStr=="" {
				nowStr = kv.Key
				values = nil
				values = append(values, kv.Value)
			}else{
				output:= reducef(kv.Key,values)
				_, err := fmt.Fprintf(tmpFile, "%v %v\n", kv.Key, output)
				if err != nil {
					log.Printf("output to tmp file fail")
					return
				}
				nowStr = kv.Key
				values = nil
				values = append(values, kv.Value)
			}
		}
	}

	fileName := "mr-out-"
	fileName+=strconv.Itoa(reply.TaskId)
	err := os.Remove(fileName)
	if err != nil {
		log.Printf("file not exist")
	}
	err = os.Rename(tmpFile.Name(), fileName)
	if err != nil {
		log.Printf("rename file fail")
		return
	}
	OverRequestArgs := OverRequestArgs{}
	OverRequestReply := OverRequestReply{}
	OverRequestArgs.TaskId = reply.TaskId
	log.Printf("reduce task ok,begin return %d", reply.TaskId)
	call("Coordinator.ReduceOverRequest",&OverRequestArgs,&OverRequestReply)

}

func CallExample() {

	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	call("Coordinator.Example", &args, &reply)

	// reply.Y should be 100.
	fmt.Printf("reply.Y %v\n", reply.Y)
}

//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := coordinatorSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer func(c *rpc.Client) {
		err := c.Close()
		if err != nil {
		}
	}(c)

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

rpc

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
	X int
}

type ExampleReply struct {
	Y int
}

// Add your RPC definitions here.

type MapCallArgs struct{
}
type MapCallReply struct{
	Err string
	Value      string
	NReduce    int
	MapTaskNum int
}
type ReduceCallArgs struct{

}
type ReduceCallReply struct{
	TaskName []string
	TaskId   int
	Err      string
}
type WorkerStatusArgs struct{}
type WorkerStatusReply struct{
	Status int
}
type OverRequestArgs struct{
	TaskId          int
	RegisterFile map[int][]string
}
type OverRequestReply struct{

}

type ReduceOverRequestArgs struct{
	PId int
}
type ReduceOverRequestReply struct{

}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

标签:6.824,return,string,err,int,Lab,nil,reply,MIT
来源: https://blog.csdn.net/treblez/article/details/120741037

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有