ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

MapReduce极简实现

2022-06-29 22:34:47  阅读:205  来源: 互联网

标签:status 极简 log err kva 实现 MapReduce reply string


0 概述

MapReduce是一种广泛运用的分布式-大数据计算编程模型,最初由Google发表,其开源实现为Hadoop。

MapReduce 的编程模型非常简单,正如名字一样,用户仅仅需要实现一个 Map 函数,一个 Reduce 函数。

  • Map 函数,即映射函数:它会接受一个 key-value 对,然后把这个 key-value 对转换成 0 到多个新的 key-value 对并输出出去。

    map (k1, v1) -> list (k2, v2)
    
  • Reduce 函数,即化简函数:它接受一个 Key,以及这个 Key 下的一组 Value,然后化简成一组新的值 Value 输出出去。

    reduce (k2, list(v2)) -> list(v3)
    

可以解决的任务例子:

  • 分布式 grep;
  • 统计 URL 的访问频次;
  • 反转网页 - 链接图;
  • 分域名的词向量;
  • 生成倒排索引;
  • 分布式排序。

1 MapReduce结构

一图胜千言:

截屏2022-06-29 21.47.52

2 总体设计

以完成6.8242021Spring的lab1为目标。

可以通过以下git命令:clone代码:

git clone git://g.csail.mit.edu/6.824-golabs-2021 6.824

master采用lazy分配任务方法,由worker主动去触发任务分配、任务结束等操作。master分配不同的块给不同的worker执行。

因此worker需要实现获取任务,任务结束等RPC,代码如下:

type GetTaskArgs struct {
}

type GetTaskReply struct {
	Type      TaskType
	Filenames []string
	Task_no   int
	NReduce   int
	Err       Errno
}

type FinishTaskArgs struct {
	Type    TaskType
	Task_no int
}

type FinishTaskReply struct {
	Err Errno
}

3 worker设计

worker的工作就是不断获取任务,若任务完成则提交之。

其主要代码为:

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.
	for {
		args := GetTaskArgs{}
		reply := GetTaskReply{}
		log.Printf("get task request: %v\n", args)
		ok := CallGetTask(&args, &reply)
		log.Printf("recv get task reply: %v\n", reply)
		if !ok || reply.Type == STOP {
			break
		}

		// handle map fynction
		switch reply.Type {
		case MAP:
			if len(reply.Filenames) < 1 {
				log.Fatalf("don't have filename")
			}
			DoMAP(reply.Filenames[0], reply.Task_no, reply.NReduce, mapf)
			// map complete, send msg to master
			finish_args := FinishTaskArgs{
				Type:    MAP,
				Task_no: reply.Task_no,
			}
			finish_reply := FinishTaskReply{}
			log.Printf("finish request: %v\n", finish_args)
			CallFinishTask(&finish_args, &finish_reply)
			log.Printf("recv finish reply: %v\n", finish_reply)
			// time.Sleep(time.Second)
		case REDUCE:
			if len(reply.Filenames) < 1 {
				log.Fatalf("don't have filenames")
			}
			DoReduce(reply.Filenames, reply.Task_no, reducef)
			// reduce complete, send msg to master
			finish_args := FinishTaskArgs{
				Type:    REDUCE,
				Task_no: reply.Task_no,
			}
			finish_reply := FinishTaskReply{}
			log.Printf("finish request: %v\n", finish_args)
			CallFinishTask(&finish_args, &finish_reply)
			log.Printf("recv finish reply: %v\n", finish_reply)
			// time.Sleep(time.Second)
		case WAIT:
			log.Printf("wait task\n")
			time.Sleep(time.Second)
		default:
			time.Sleep(time.Second)
		}
	}
}

其中分MAP、REDUCE、WAIT和STOP四个状态:

  • MAP:进行MAP操作
  • REDUCE:进行REDECE操作
  • WAIT:等待其他worker完成任务(比如等待在总体MAP任务的收尾上,以及没有更多的MAP任务可以分配了)
  • STOP:worker停止、退出

其中最重要的为map和reduce任务的执行。

map任务的执行实现代码如下:(对应上图中的2、3、4步)

func DoMAP(filename string, task_no int, nReduce int, mapf func(string, string) []KeyValue) {
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()

	kva := mapf(filename, string(content))

	sort.Sort(ByKey(kva))

	log.Println("encode to json")
	files := make([]*os.File, nReduce)
	encoders := make([]*json.Encoder, nReduce)
	for i := 0; i < nReduce; i++ {
		ofile, err := ioutil.TempFile("", "mr-tmp*")
		if err != nil {
			log.Fatalf("cannot create temp file")
		}
		defer ofile.Close()

		encoder := json.NewEncoder(ofile)
		encoders[i] = encoder
		files[i] = ofile
	}

	var index int
	for _, kv := range kva {
		index = ihash(kv.Key) % nReduce
		err = encoders[index].Encode(&kv)
		if err != nil {
			log.Fatalf("cannot encode %v", kv)
		}
	}

	// atomically rename
	for i := 0; i < nReduce; i++ {
		filename_tmp := fmt.Sprintf("mr-%d-%d", task_no, i)
		err := os.Rename(files[i].Name(), filename_tmp)
		if err != nil {
			log.Fatalf("cannot rename %v to %v", files[i].Name(), filename_tmp)
		}
	}
}

比较有意思的是map需要通过一个hash函数将相同的条目分布在同一输出文件中:

func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

var index int
for _, kv := range kva {
  index = ihash(kv.Key) % nReduce
  err = encoders[index].Encode(&kv)
  if err != nil {
  log.Fatalf("cannot encode %v", kv)
  }
}

reduce任务的执行实现代码如下:(对应上图中的5、6步)

func DoReduce(filenames []string, task_no int, reducef func(string, []string) string) {
	// read data from mid-file
	kva := make([]KeyValue, 0)
	for _, filename := range filenames {
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
		defer file.Close()
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
	}

	sort.Sort(ByKey(kva))

	// call Reduce on each distinct key in kva[],
	// and print the result to mr-out-0.
	ofile, err := ioutil.TempFile("", "mr-out-tmp*")
	if err != nil {
		log.Fatalf("cannot create temp file")
	}
	defer ofile.Close()

	i := 0
	for i < len(kva) {
		j := i + 1
		for j < len(kva) && kva[j].Key == kva[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kva[k].Value)
		}
		output := reducef(kva[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)

		i = j
	}

	output_filename := fmt.Sprintf("mr-out-%d", task_no)
	err = os.Rename(ofile.Name(), output_filename)
	if err != nil {
		log.Fatalf("cannot rename %v to %v", ofile.Name(), output_filename)
	}
}

按道理应该是要在GFS上读写文件的,条件不允许,就直接采用UNIX的文件系统了。

4 master设计

master的设计还是比较简单的,只包含很少的信息:

type Coordinator struct {
	tasks   []Task
	nReduce int
	nMap    int
	status  CoordinatorStatus
	mu      sync.Mutex
}

对所需要进行的任务信息进行定义,如下:

type TaskStatus int

const (
	idle TaskStatus = iota
	in_progress
	completed
)

type Task struct {
	tno       int
	filenames []string
	status    TaskStatus
	startTime time.Time
}

其主要就是接受worker的两个RPC请求。

获取任务的RPC handler实现如下:

  • 对于长时间(10s)未完成的任务,重新制定一个worker执行此任务。
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	finish_flag := c.IsAllFinish()
	if finish_flag {
		c.NextPhase()
	}
	for i := 0; i < len(c.tasks); i++ {
		if c.tasks[i].status == idle {
			log.Printf("send task %d to worker\n", i)
			reply.Err = SuccessCode
			reply.Task_no = i
			reply.Filenames = c.tasks[i].filenames
			if c.status == MAP_PHASE {
				reply.Type = MAP
				reply.NReduce = c.nReduce
			} else if c.status == REDUCE_PHASE {
				reply.NReduce = 0
				reply.Type = REDUCE
			} else {
				log.Fatal("unexpected status")
			}
			c.tasks[i].startTime = time.Now()
			c.tasks[i].status = in_progress
			return nil
		} else if c.tasks[i].status == in_progress {
			curr := time.Now()
			if curr.Sub(c.tasks[i].startTime) > time.Second*10 {
				log.Printf("resend task %d to worker\n", i)
				reply.Err = SuccessCode
				reply.Task_no = i
				reply.Filenames = c.tasks[i].filenames
				if c.status == MAP_PHASE {
					reply.Type = MAP
					reply.NReduce = c.nReduce
				} else if c.status == REDUCE_PHASE {
					reply.NReduce = 0
					reply.Type = REDUCE
				} else {
					log.Fatal("unexpected status")
				}
				c.tasks[i].startTime = time.Now()
				return nil
			}
		}
	}
	reply.Err = SuccessCode
	reply.Type = WAIT
	return nil
}

完成任务的RPC handler实现如下:

func (c *Coordinator) FinishTask(args *FinishTaskArgs, reply *FinishTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if args.Task_no >= len(c.tasks) || args.Task_no < 0 {
		reply.Err = ParaErrCode
		return nil
	}
	c.tasks[args.Task_no].status = completed
	if c.IsAllFinish() {
		c.NextPhase()
	}
	return nil
}

检查全部任务是否完成,完成就进入下一个阶段:

func (c *Coordinator) IsAllFinish() bool {
	for i := len(c.tasks) - 1; i >= 0; i-- {
		if c.tasks[i].status != completed {
			return false
		}
	}
	return true
}

func (c *Coordinator) NextPhase() {
	if c.status == MAP_PHASE {
		log.Println("change to REDUCE_PHASE")
		c.MakeReduceTasks()
		c.status = REDUCE_PHASE
	} else if c.status == REDUCE_PHASE {
		log.Println("change to FINISH_PHASE")
		c.status = FINISH_PHASE
	} else {
		log.Println("unexpected status change!")
	}
}

客户端查看MapReduce任务是否完成:

func (c *Coordinator) Done() bool {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.status == FINISH_PHASE {
		return true
	}
	return false
}

5 客户端如何使用呢?

写两个函数(Map和Reduce)就行啦:

//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
	// function to detect word separators.
	ff := func(r rune) bool { return !unicode.IsLetter(r) }

	// split contents into an array of words.
	words := strings.FieldsFunc(contents, ff)

	kva := []mr.KeyValue{}
	for _, w := range words {
		kv := mr.KeyValue{w, "1"}
		kva = append(kva, kv)
	}
	return kva
}

//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func Reduce(key string, values []string) string {
	// return the number of occurrences of this word.
	return strconv.Itoa(len(values))
}

6 附录

详细代码可以参考:

仓库

commit

标签:status,极简,log,err,kva,实现,MapReduce,reply,string
来源: https://www.cnblogs.com/cxl-/p/16425143.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有