仓库地址
前言
在刚开始学习 Go 的时候,就听说过 MIT 6.824(现在叫 6.8540),是一门很经典且口碑很好的分布式课程。两天前我开始学习这门课程,并在刚刚完成了 Lab 1 。
在此总结一下目前对 MapReduce 的理解和做 Lab 1 过程中的心得体会。
MapReduce
翻看 6.824 的课程表,第一节课布置了一篇课前论文,是谷歌在 2004 年发表的 MapReduce: Simplified Data Processing on Large Clusters (中文翻译——叉鸽)看完之后就能理解 MapReduce 的工作方式了。
MapReduce is a programming model and an associ
ated implementation for processing and generating large
data sets. Users specify a map function that processes a
key/value pair to generate a set of intermediate key/value
pairs, and a reduce function that merges all intermediate
values associated with the same intermediate key. Many
real world tasks are expressible in this model, as shown
in the paper.
简单来讲,MapReduce 用于计算可以被表示为键值对模型的问题。其中,用户自定义的 Map 函数将输入数据转换为一组中间键值对,这些中间键值对通过一些处理后交由 Reduce 函数处理,后者将中间键值对转化为计算结果。
我们可以用一个简单的例子来理解。假设我们现在有几篇文章,我们想要统计这几篇文章中,各个单词分别出现了多少次。根据 MapReduce 模型,我们将文章数据交给 Map 函数。此时的输入键值对为{文章名,文章内容}(虽然这里文章名并没有用,但它确实能表示为键值对的形式)。Map 函数在接收到数据后,对文章的单词进行分割然后遍历,发射(Emit)出一串键值对{单词,1}。结合代码或许能更好理解:
func Map(filename string, contents string) []mr.KeyValue {
// 根据非字母的字符分割单词
ff := func(r rune) bool { return !unicode.IsLetter(r) }
words := strings.FieldsFunc(contents, ff)
//遍历所有单词并Emit,kva即为此次Map的输出数据
var kva []mr.KeyValue
for _, w := range words {
kv := mr.KeyValue{Key: w, Value: "1"}
kva = append(kva, kv)
}
return kva
}
func Reduce(key string, values []string) string {
//因为Map输出的值始终为1,所以单词出现次数就是键值对的数量
return strconv.Itoa(len(values))
}
随后我们需要对多次 Map 的输出数据进行处理,以便能将同一单词为键的键值对都交给同一个 Reduce 函数。这种处理有很多种方式,在后面的 Lab 中,将使用哈希桶的方式,但在这个简单的问题中,可以直接对输出数据按照键进行排序,然后将所有同样的单词交给 Reduce 处理。以下就是这个简单的 MapReduce 实现:
type ByKey []mr.KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func main() {
mapf, reducef := Map, Reduce
//读入文章数据
var intermediate []mr.KeyValue
for _, filename := range os.Args[1:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
_ = file.Close()
//进行Map操作
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}
//按键排序
sort.Sort(ByKey(intermediate))
oname := "mr-out-0"
ofile, _ := os.Create(oname)
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
//进行Reduce操作
output := reducef(intermediate[i].Key, values)
_, _ = fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
_ = ofile.Close()
}
通过这种计算键值对的方式,我们还可以解决许多其他实际问题,比如谷歌当初发明 MapReduce 就是为了解决网页排名,倒排索引,访问量统计等问题。
然而,在这个例子中,我们是在一个进程中进行了 Map ,中间处理和 Reduce 操作,并没有提升计算效率。那么如何让 MapReduce 在分布式系统中工作呢?我们先看论文中的图片:
这张图展示了在我们的实现中,MapReduce 操作的完整工作流。当用户程序调用 MapReduce 函数时会发生如下的操作(下列序号与图中序号对应):
用户程序中的 MapReduce 库首先将输入文件划分为 M 个分片,通常每个分片为 16MB 到 64MB(用户可通过可选参数控制)。随后,库会在集群中的机器上启动程序的一些副本。
这些程序的副本中,有一份很特殊,它是 master 副本。其他的副本是被 master 分配了任务的 worker 副本。总计要分配 M 个map任务和 R 个reduce任务。master 选取闲置的 worker 并为每个选取的 worker 分配 map 或 reduce 任务。
被分配 map 任务的 worker 从输入数据分片中读取内容。其解析输入数据中的键值对,并将每个键值对传给用户定义的 map 函数。map 函数输出的中间键值对在内存中缓存。
内存中缓存的键值对会定期地写入本地磁盘,写入的数据会被分区函数划分为 R 个区域。这些在磁盘中缓存的键值对的位置会被发送给 master,master 会将这些位置信息进一步传递给 reduce worker 。
当 master 通知 reduce worker 中间键值对的位置信息后,reduce worker 会通过 RPC 的方式从 map worker 的本地磁盘中读取缓存的数据。当 reduce worker 读取完所有中间数据后,它会对中间数据按照键进行排序,以便将所有键相同的键值对分为一组。因为通常来说,需对键不同的数据会被映射到同一个 reduce 任务中,所以需要对数据排序。如果中间数据总量过大以至于无法放入内存中,则会使用外排序算法(external sort)。
reduce worker 遍历每一个遇到的中间键值对的,它会将键和该键对应的一系列值传递给用户定义的 reduce 函数。reduce 函数的输出会被追加(append)到该 reduce 分区的最终输出文件中。
当所有的 map 和 reduce 任务都执行完毕后,master 会唤醒用户程序。此时,调用 MapReduce 的调应用序会返回到用户代码中。
在成功执行完毕后,MapReduce 的输出可在通过 R 个输出文件访问(每个 reduce 任务一个文件,文件名由用户指定)。通常情况下,用户不需要将这 R 个输出文件合并到一个文件中,用户经常将这些文件作为另一次 MapReduce 调用的输入,或者在另一个能够从多个分区的文件输入的分布式程序中使用这些文件。
懂得原理后,我们就可以着手实现我们的第一个 Lab 了。
Lab:MapReduce
我们先把课程给我们的代码拉取下来:
git clone git://g.csail.mit.edu/6.5840-golabs-2024 6.5840
Lab 1 要求我们修改 /src/mr 中的 coordinator.go worker.go rpc.go 三个文件,来实现一个简单的 MapReduce 系统,运行测试脚本后,会使用几组不同的 Map 和 Reduce 函数来对我们的系统进行测试。
我们首先明确我们的系统结构。在原本给好的示例代码中,worker 通过不断向 coordinator 轮询的方式获得任务。那么我们可以将返回的任务分为以下几种类型:
- map Map任务
- reduce Reduce任务
- wait 等待任务(没有空闲的任务,但任务并未全部完成)
- exit 任务全部完成,通知 worker 退出进程。
Task 结构体的定义如下:
type taskState int
const (
Idle taskState = iota
InProgress
Completed
)
type Task struct {
Type string // "map" , "reduce" , "wait" or "exit"
State taskState
ID int
NMap int
NReduce int
InputFile string
StartTime int64
}
解释一下这里的几个字段:
State 表示任务的状态,可以是 Idle(空闲),InProgress(有 worker 正在处理中),Completed(已完成)。
NMap 和 NReduce 表示用户设定的 Map 任务数量和 Reduce 任务数量,具体意义可以看论文原文。注意该字段必须导出,我因为没导出使得 RPC 传输时丢失字段,调试了很久……
InputFile 表示 Map 任务的输入文件名,而 Reduce 函数的输入则由中间数据的文件名规范约定。
StartTime 标记了任务开始处理的时间,用于识别超时的任务,达到容错的效果。如果在某次扫描中发现任务超时,则直接将它设为 Idle,并重新分配 worker。这种设计的优点是我们可以抛弃 worker 的状态,只记录 Task 的状态。
接下来我们来设计一下 coordinator 的结构:
type Coordinator struct {
mu sync.Mutex
mapTasks []Task
reduceTasks []Task
nMap int
nReduce int
allMapDone bool
allReduceDone bool
}
补充一下一些接口函数:
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
err := rpc.Register(c)
if err != nil {
return
}
rpc.HandleHTTP()
sockname := coordinatorSock()
_ = os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
err := http.Serve(l, nil)
if err != nil {
}
}()
}
func (c *Coordinator) Done() bool {
c.mu.Lock()
defer c.mu.Unlock()
//当所有map和reduce任务都完成时,返回true
return c.allMapDone && c.allReduceDone
}
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
mapTasks: make([]Task, len(files)),
reduceTasks: make([]Task, nReduce),
nMap: len(files),
nReduce: nReduce,
}
//初始化map任务
for i, file := range files {
c.mapTasks[i] = Task{
Type: "map",
State: Idle,
ID: i,
InputFile: file,
NMap: c.nMap,
NReduce: c.nReduce,
}
}
//初始化reduce任务
for i := 0; i < nReduce; i++ {
c.reduceTasks[i] = Task{
Type: "reduce",
State: Idle,
ID: i,
NMap: c.nMap,
NReduce: c.nReduce,
}
}
c.server()
return &c
}
当 worker 请求任务时,返回一个空闲的 Map 任务或 wait 任务,若 Map 任务已经全部完成,则返回 Reduce 任务:
func (c *Coordinator) RequestTask(args *RequestArgs, reply *TaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.allMapDone {
//检查是否有map任务
for i := 0; i < c.nMap; i++ {
//检测超时任务
if c.mapTasks[i].State == InProgress && time.Now().Unix()-c.mapTasks[i].StartTime > 10 {
c.mapTasks[i].State = Idle
}
//返回一个空闲的map任务
if c.mapTasks[i].State == Idle {
c.mapTasks[i].State = InProgress
c.mapTasks[i].StartTime = time.Now().Unix()
reply.Task = c.mapTasks[i]
log.Printf("Sending task: %+v, nMap: %d, nReduce: %d", reply.Task, reply.Task.NMap, reply.Task.NReduce)
return nil
}
}
//暂时没有空闲任务,通知worker等待
reply.Task = Task{
Type: "wait",
}
return nil
}
if !c.allReduceDone {
//检查是否有reduce任务
for i := 0; i < c.nReduce; i++ {
//检测超时任务
if c.reduceTasks[i].State == InProgress && time.Now().Unix()-c.reduceTasks[i].StartTime > 10 {
c.reduceTasks[i].State = Idle
}
//返回一个空闲的reduce任务
if c.reduceTasks[i].State == Idle {
c.reduceTasks[i].State = InProgress
c.reduceTasks[i].StartTime = time.Now().Unix()
reply.Task = c.reduceTasks[i]
return nil
}
}
//暂时没有空闲任务,通知worker等待
reply.Task = Task{
Type: "wait",
}
return nil
}
//所有任务都完成,通知worker退出
reply.Task = Task{
Type: "exit",
}
return nil
}
同时,我们还需要给 worker 提供一个上报任务完成的接口:
// TaskDone 告诉coordinator任务已经完成
func (c *Coordinator) TaskDone(args *RequestArgs, reply *TaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if args.TaskType == "map" {
c.mapTasks[args.TaskID].State = Completed
} else if args.TaskType == "reduce" {
c.reduceTasks[args.TaskID].State = Completed
}
//检查是否所有map任务都完成
c.allMapDone = true
for i := 0; i < c.nMap; i++ {
if c.mapTasks[i].State != Completed {
c.allMapDone = false
break
}
}
//检查是否所有reduce任务都完成
c.allReduceDone = true
for i := 0; i < c.nReduce; i++ {
if c.reduceTasks[i].State != Completed {
c.allReduceDone = false
break
}
}
if c.allMapDone && c.allReduceDone {
log.Println("all tasks done")
time.Sleep(3 * time.Second)
}
return nil
}
在 worker.go 的 Worker 函数中,不断轮询 coordinator 请求任务,并做相应处理:
// Worker main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
reply := TaskReply{}
ok := call("Coordinator.RequestTask", &RequestArgs{}, &reply)
log.Printf("Received task: %+v, nMap: %d, nReduce: %d", reply.Task, reply.Task.NMap, reply.Task.NReduce)
if !ok {
break
}
task := reply.Task
//检测任务类型
if task.Type == "map" {
//执行map任务
doMap(task, reply.Task.NReduce, mapf)
}
if task.Type == "reduce" {
//执行reduce任务
doReduce(task, reply.Task.NMap, reducef)
}
if task.Type == "wait" {
time.Sleep(1 * time.Second)
}
if task.Type == "exit" {
break
}
//通知Master任务完成
call("Coordinator.TaskDone", &RequestArgs{TaskID: task.ID, TaskType: task.Type}, &TaskReply{})
}
}
doMap 函数根据传入的 mapf 对数据进行处理,按照 mr-Map任务ID-Reduce任务ID 的格式将中间数据写入磁盘,其中 Reduce 任务的 ID 由 ihash(kv.Key) % nReduce 确定,其中 ihash 在示例代码中已经给出。
// doMap 执行map任务
func doMap(task Task, nReduce int, mapf func(string, string) []KeyValue) {
//读取文件内容
filename := task.InputFile
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
err = file.Close()
if err != nil {
return
}
kva := mapf(filename, string(content))
//将map结果写入中间文件
intermediate := make([][]KeyValue, nReduce)
for _, kv := range kva {
reduceTask := ihash(kv.Key) % nReduce
intermediate[reduceTask] = append(intermediate[reduceTask], kv)
}
//将中间文件写入磁盘
for i, kva := range intermediate {
oname := fmt.Sprintf("mrx-%d-%d", task.ID, i)
ofile, _ := os.Create(oname)
enc := json.NewEncoder(ofile)
for _, kv := range kva {
err := enc.Encode(&kv)
if err != nil {
log.Fatalf("cannot encode %v", kv)
}
}
err := os.Rename(oname, fmt.Sprintf("mr-%d-%d", task.ID, i))
if err != nil {
return
}
err = ofile.Close()
if err != nil {
return
}
}
}
doReduce 类似:
// doReduce 执行reduce任务
func doReduce(task Task, nMap int, reducef func(string, []string) string) {
//根据任务ID读取中间文件
var intermediate []KeyValue
for i := 0; i < nMap; i++ {
filename := fmt.Sprintf("mr-%d-%d", i, task.ID)
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
// 读取文件内容
decoder := json.NewDecoder(file)
for {
var kv KeyValue
if err := decoder.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
err = file.Close()
if err != nil {
return
}
}
// 按键排序(方便分组)
sort.Slice(intermediate, func(i, j int) bool {
return intermediate[i].Key < intermediate[j].Key
})
// 对每个键调用 Reduce 函数
oname := fmt.Sprintf("mr-xout-%d", task.ID) //临时文件
ofile, _ := os.Create(oname)
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// 写入最终输出文件
_, err := fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
if err != nil {
return
}
i = j
}
//重命名文件,保证原子性
err := os.Rename(oname, fmt.Sprintf("mr-out-%d", task.ID))
if err != nil {
return
}
//关闭文件
err = ofile.Close()
if err != nil {
return
}
}
这两个函数有一个特别要注意的点,在论文的容错处理中也有提及。简单的来讲,若我们的 MapReduce 函数是确定性函数,我们可以借助底层文件系统的原子性重命名操作,来保证最终的文件系统中仅包含来自一次 reduce 任务输出的数据。
在上面的代码中,我们先将输出数据写到一个临时文件中,当文件写入成功,我们再将文件名更名为正式文件名,以防出现文件写入失败导致后续 worker 无法写入数据的情况。
在完成 worker 的编写后,我们就可以进行测试了。在 linux 环境下运行 main 文件夹中的 test-mr.sh 脚本,就可以自动测试所有测试用例。