ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Lab1:实现简单的MapReduce框架

2022-09-02 12:31:59  阅读:187  来源: 互联网

标签:map string 框架 kva MapReduce filename Lab1 file reply


1.目标

Lab1提供了单机串行的MR框架,可以直接运行。需要改写为Master-Slave式的架构,lab1也提供的调用框架和RPC通信示例,我们的任务聚焦在MR框架即可,主要实现的内容为:

  1. Worker不断请求Map任务,Coordinator将Map Task分发给Worker(一个原始输入文件对应一个Map Task)

  2. Map阶段
    a) Worker处理输入文件,Map函数输入为(filename string,content string),其中filename为输入文件名,content为该文件的内容,输出为KV数组;
    b) 我们需要将相同Key的二元组聚集到一起,然后根据Lab1提供的ihash函数将ihash(Key)%ReduceN相同的二元组写到同一中间文件;其中ReduceN为Lab1设定的Reduce job数目。如果我们输入文件为M,那么总的中间文件数目应小于等于M*ReduceN

  3. Worker不断请求Rudce任务,Coordinator将Reduce Task分发给Worker(一个ReduceID对应一个Reduce Job,这里的Job我理解的是Master一次性发给Worker的批量数据,在下次请求Job前,Work需要先把这次的数据处理完)

  4. Reduce阶段
    a) Worker处理ReduceId对应的所有文件,由于一个中间文件中可能有不同的Key,我们需要先聚集相同Key的二元组,然后分别给Reduce处理
    b) Reduce完成后,写入最终文件即可

2.实现

2.1 Worker端

worker.go

我们的Worker会不断的向Master请求任务,Master会将自己的状态(进行到哪一步)同步给Worker,Worker根据自己的状态决定请求Map Task还是Reduce Task

我们封装了Map函数,在调用应用层的Reduce之前,首先处理好数据;在调用之后,写入文件

func DoMap(reduceMax int, mapDone *bool, mapf func(string, string) []KeyValue) {
    //get map task
    reply := GetMapFileReply{}
    getMapFile(&reply, mapDone)
    //get content
    if reply.MaptaskNumber >= 0 {

        file, err := os.Open(reply.Filename)
        if err != nil {
            log.Fatalf("cannot open %v", reply.Filename)
        }
        content, err := ioutil.ReadAll(file)
        if err != nil {
            log.Fatalf("cannot read %v", reply.Filename)
        }
        file.Close()
        //call application map function
        kva := mapf(reply.Filename, string(content))

        sort.Sort(ByKey(kva))

        i := 0
        mapOutFileName := []string{}
        mapOutTmp2Final := make(map[string]string)
        mapOutFileContent := make(map[int][]MapOut)
        //split the content by key
        for i < len(kva) {
            j := i + 1
            for j < len(kva) && kva[j].Key == kva[i].Key {
                j++
            }
            mapout := MapOut{}
            for k := i; k < j; k++ {
                mapout.Value = append(mapout.Value, kva[k].Value)
            }
            mapout.Key = kva[i].Key
            reduceId := ihash(kva[i].Key) % reduceMax
            _, ok := mapOutFileContent[reduceId]
            if !ok {
                filename := "mr-" + strconv.Itoa(reply.MaptaskNumber) + "-" + strconv.Itoa(reduceId)
                //fmt.Println("file name is " + filename)
                mapOutFileName = append(mapOutFileName, filename)
            }
            mapOutFileContent[reduceId] = append(mapOutFileContent[reduceId], mapout)
            i = j
        }
        //generate the intermediate file
        for _, filename := range mapOutFileName {
            //get reduceId according to intermediate file name
            suffix := strings.Split(filename, "-")
            reduceId, _ := strconv.Atoi(suffix[2])

            //file, _ = os.Create(filename)
            file, _ = ioutil.TempFile("", filename+"*")
            mapOutTmp2Final[file.Name()] = filename
            enc := json.NewEncoder(file)
            for _, content := range mapOutFileContent[reduceId] {
                err := enc.Encode(&content)
                if err != nil {
                    fmt.Println("encode failed " + err.Error())
                }
            }
            file.Close()
        }
        //send one map file done
        sendMapDone(reply.MaptaskNumber, reply.Filename, mapOutTmp2Final)
    } else if *mapDone == false && reply.MaptaskNumber < 0 {
        //wait for all map finish
        time.Sleep(1)
    }
}

我们封装了Reduce函数,在调用应用层的Reduce之前,首先处理好数据;在调用之后,写入文件

func DoReduce(reduceDone *bool, reducef func(string, []string) string) {
    reply := GetReduceFileReply{}
    //get reduce task
    getReduceFile(&reply, reduceDone)
    kva := []MapOut{}
    ReduceOutTmp2Final := make(map[string]string)
    if !*reduceDone && reply.ReduceId >= 0 {
        //create out file
        outFileName := "mr-out-" + strconv.Itoa(reply.ReduceId)
        outfile, _ := ioutil.TempFile("", outFileName+"*")
        ReduceOutTmp2Final[outfile.Name()] = outFileName
        defer outfile.Close()

        //read from intermediate file
        for _, filename := range reply.Filename {
            file, _ := os.Open(filename)
            defer file.Close()
            dec := json.NewDecoder(file)
            for {
                var kv MapOut
                if err := dec.Decode(&kv); err != nil {
                    break
                }
                kva = append(kva, kv)
            }
        }
        sort.Sort(MapOutByKey(kva))
        //split content by key
        i := 0
        for i < len(kva) {
            j := i + 1
            for j < len(kva) && kva[j].Key == kva[i].Key {
                j++
            }
            intermediate := []string{}
            for k := i; k < j; k++ {
                intermediate = append(intermediate, kva[k].Value...)
            }
            reduceRes := reducef(kva[i].Key, intermediate)
            fmt.Fprintf(outfile, "%v %v\n", kva[i].Key, reduceRes)
            i = j
        }
        //call application reduce
        sendReduceDone(reply.ReduceId, ReduceOutTmp2Final)
    //wait for all reduce done
    } else if !*reduceDone && reply.ReduceId < 0 {
        time.Sleep(1)
    }
}

worker在完成任务后,会向Master发送“我搞定了”,Master此时检测是否当前阶段的所有任务都已经实现

2.2 Coordinator端

Master端需要对不同阶段、不同Task进度、超时时间等进行记录

type Coordinator struct {
    //map task : not started / running / finished
    mapTaskState map[string]int
    //used for timeout check
    mapTaskTime map[string]int64
    //idicated map task
    mapTaskNumber int
    //if in map step
    mapDone bool

    //intermediate map out file
    mapOutFileArray map[int][]string

    //if in reduce step
    reduceDone bool
    //reduce task : not started / running / finished
    reduceTaskState map[int]int
    //used for timeout check
    reduceTaskTime map[int]int64
    //given by caller,indicated the reduce job
    nReduce int

    //task state lock
    taskStatLock sync.Mutex
    //step state lock
    taskDone sync.Mutex
}

这里Map和Reduce流程类似,如下Map流程:

首先获取Map任务,并分发给Worker

func (c *Coordinator) GetMapInFile(args *GetMapFileArgs, reply *GetMapFileReply) error {
    //get map file
    c.taskDone.Lock()
    if !c.mapDone {
        c.taskDone.Unlock()
        c.taskStatLock.Lock()
        for task, _ := range c.mapTaskState {
            if c.mapTaskState[task] == -1 {
                reply.Filename = task
                reply.MapDone = false
                reply.MaptaskNumber = c.mapTaskNumber
                c.mapTaskNumber++
                c.mapTaskState[task] = 0
                c.mapTaskTime[task] = time.Now().Unix()
                c.taskStatLock.Unlock()
                return nil
            }
        }
        reply.MapDone = false
    } else {
        reply.MapDone = true
    }
    reply.MaptaskNumber = -1
    return nil
}

在收到一个Map Task完成后,记录任务状态并检查是否所有任务完成能进入下一状态

func (c *Coordinator) MapSingleFileDone(args *MapDoneArgs, reply *MapDoneReply) error {
    //set this map task done
    c.taskStatLock.Lock()
    c.mapTaskState[args.Filename] = 1
    c.taskStatLock.Unlock()
    //c.mapTaskDoneCollection = append(c.mapTaskDoneCollection, args.MaptaskNumber)
    //record reduceid <-> intermediate file name
    for tmpfile, filename := range args.MapOutTmp2Final {
        os.Rename(tmpfile, filename)
        suffix := strings.Split(filename, "-")
        reduceN, _ := strconv.Atoi(suffix[2])
        c.mapOutFileArray[reduceN] = append(c.mapOutFileArray[reduceN], filename)
    }
    //test if all map done
    reply.Y = args.MaptaskNumber
    for _, i := range c.mapTaskState {
        if i != 1 {
            return nil
        }
    }
    //if all map done,set reduce task state
    c.taskStatLock.Lock()
    for key := range c.mapOutFileArray {
        c.reduceTaskState[key] = -1
    }
    c.taskStatLock.Unlock()
    c.taskDone.Lock()
    c.mapDone = true
    c.taskDone.Unlock()
    return nil
}

3.Lab1中提到的Tips

  1. 对于所有文件,可以先使用ioutil.TempFile创建临时文件,在任务结束后再改名为最终文件;避免部分中间部分Worker退出或崩溃,导致最终的文件混乱
//创建临时文件,临时文件不可见
file, _ = ioutil.TempFile("", filename+"*")
//创建 临时文件 到 最终文件名的映射
mapOutTmp2Final[file.Name()] = filename
...
//修改为最终文件,因为前两步在Worker中进行,这一步在Master进行,所以使用map来缓存临时文件的文件路径
os.Rename(ReduceOutTmp2Final[tmpName], filename)
  1. 对于每个任务,需要有超时判断,如果任务超时,将任务发给其他Worker来做,这里我另外起了一个线程来做超时判断。

  2. 在go run时,使用-race来判断是否有竞态,及时加锁

4.结果与改进

1.完成了所有测试

2.使用锁之后,明显性能下降很多,需要优化锁的粒度和类型

3.对于任务应该抽象为结构体,使用管道通信,会让流程更简洁

标签:map,string,框架,kva,MapReduce,filename,Lab1,file,reply
来源: https://www.cnblogs.com/vstone/p/16649409.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有