Go言語でMapReduceのフレームワークを作ってみた
前回のErlang、前々回のScalaに続き、Goでも作ってみた。
Goのチャネルは、ScalaのアクターやErlangのプロセスが持っているメールボックスの仕組みと異なり、明示的に作る必要がある(make関数を使う)。また、未受信のメッセージをいくつまで溜めておけるようにするかを指定しなければならない。下記では、その数を1000にしている。
package main import "bufio" import "fmt" import "os" import "strings" type KeyValue struct { Key interface{} Value interface{} } type MapReduce struct { Read func(chan<- *KeyValue) Map func(*KeyValue, chan<- *KeyValue) Reduce func(*KeyValue, chan<- *KeyValue) Write func(*KeyValue) shuffleMap map[interface{}] []interface{} inputCh chan *KeyValue inputExitCh chan interface{} mapExitCh chan interface{} shuffleCh chan *KeyValue shuffleExitCh chan interface{} reduceExitCh chan interface{} outputCh chan *KeyValue outputExitCh chan interface{} } func log(s string) { fmt.Printf("%s\n", s) } // MapReduce構造体の各フィールドを初期化するメソッド func (this *MapReduce) init() { this.shuffleMap = make(map[interface{}] []interface{}) this.inputCh = make(chan *KeyValue, 1000) // ReadはhandleInputを待たない this.inputExitCh = make(chan interface{}) this.mapExitCh = make(chan interface{}) this.shuffleCh = make(chan *KeyValue, 1000) // Mapはshuffleを待たない this.shuffleExitCh = make(chan interface{}) this.reduceExitCh = make(chan interface{}) this.outputCh = make(chan *KeyValue, 1000) // ReduceはhandleOutputを待たない this.outputExitCh = make(chan interface{}) } // KeyValueを繰り返し受け取って処理するメソッド func (this *MapReduce) loopReceive(requestCh <-chan *KeyValue, exitCh chan<- interface{}, acc0 interface{}, process func(acc interface{}, kv *KeyValue) interface{}) { acc := acc0 for { kv := <- requestCh if kv == nil { close(requestCh) exitCh <- acc return } acc = process(acc, kv) } } // Shuffle処理を行うメソッド func (this *MapReduce) shuffle(kv *KeyValue) { array, ok := this.shuffleMap[kv.Key] if ok { // 元の配列の末尾に kv.Value を追加した新しい配列を作る newArray := make([]interface{}, len(array) + 1) for i, v := range(array) { newArray[i] = v } newArray[len(array)] = kv.Value this.shuffleMap[kv.Key] = newArray } else { this.shuffleMap[kv.Key] = []interface{}{kv.Value} } } // Map処理およびReduce処理のコア部分 func (this *MapReduce) processParallel(processName string, kv *KeyValue, process func(*KeyValue, chan<- *KeyValue), outputCh chan<- *KeyValue, exitCh chan<- interface{}) { log(fmt.Sprintf("%s start key=[%v] value=[%v]", processName, kv.Key, kv.Value)) process(kv, outputCh) log(fmt.Sprintf("%s end", processName)) exitCh <- nil } // 複数のゴルーチンが全て完了するまで待つ func (this *MapReduce) waitAll(ch <-chan interface{}, count int) { for i := 0; i < count; i++ { <- ch } } // MapReduce実行 func (this *MapReduce) Execute() { if this.shuffleMap != nil { panic(this) } this.init() // Shuffle処理を行うゴルーチンを起動する go this.loopReceive(this.shuffleCh, this.shuffleExitCh, nil, func(_ interface{}, kv *KeyValue) interface{} { this.shuffle(kv) return nil }) // 入力データを受け取ってMap処理を起動するゴルーチンを起動する go this.loopReceive(this.inputCh, this.inputExitCh, 0, func(mapCount interface{}, kv *KeyValue) interface{} { go this.processParallel("map", kv, this.Map, this.shuffleCh, this.mapExitCh) return mapCount.(int) + 1 }) // 入力データの読み取りを行う this.Read(this.inputCh) // Map処理を起動するゴルーチンが完了するまで待つ this.inputCh <- nil mapCount := <- this.inputExitCh // Map処理を起動した回数 // Map処理が全て完了するまで待つ this.waitAll(this.mapExitCh, mapCount.(int)) // Shuffle処理が完了するまで待つ this.shuffleCh <- nil <- this.shuffleExitCh log("shuffled") // 出力データを受け取るゴルーチンを起動する go this.loopReceive(this.outputCh, this.outputExitCh, nil, func(_ interface{}, kv *KeyValue) interface{} { this.Write(kv) return nil }) // Reduce処理を行うゴルーチンを起動する for k, v := range(this.shuffleMap) { go this.processParallel("reduce", &KeyValue{k, v}, this.Reduce, this.outputCh, this.reduceExitCh) } // Reduce処理が全て完了するまで待つ this.waitAll(this.reduceExitCh, len(this.shuffleMap)) // handleOutputが完了するまで待つ this.outputCh <- nil <- this.outputExitCh log("complete") }
ここまでがフレームワーク部分。次に、使用例。
func main() { inFile, err := os.Open("mapReduce.go", os.O_RDONLY, 0) if err != nil { panic(err) } defer inFile.Close() reader := bufio.NewReader(inFile) mapReduce := MapReduce{ Read: func(inputCh chan<- *KeyValue) { lineNumber := 0 for { line, err := reader.ReadString('\n') // 1行読み出す if err == os.EOF { break } if err != nil { panic(err) } lineNumber++ if lineNumber > 5 { break } // 5行を超えたら終了 inputCh <- &KeyValue{nil, line} } }, Map: func(kv *KeyValue, shuffleCh chan<- *KeyValue) { line := kv.Value.(string) words := strings.FieldsFunc(line, func(c int) bool { // 英数字以外で区切る switch { case '0' <= c && c <= '9': return false case 'A' <= c && c <= 'Z': return false case 'a' <= c && c <= 'z': return false } return true }) for _, word := range(words) { shuffleCh <- &KeyValue{word, 1} } }, Reduce: func(kv *KeyValue, outputCh chan<- *KeyValue) { counts := kv.Value.([]interface{}) outputCh <- &KeyValue{kv.Key, len(counts)} }, Write: func(kv *KeyValue) { fmt.Printf("%5d\t%s\n", kv.Value, kv.Key) }, } mapReduce.Execute() }
その出力例。
$ 8.out map start key=[<nil>] value=[ ] map end map start key=[<nil>] value=[package main ] map end map start key=[<nil>] value=[import "bufio" ] map end map start key=[<nil>] value=[import "fmt" ] map end map start key=[<nil>] value=[import "os" ] map end shuffled reduce start key=[package] value=[[1]] reduce end reduce start key=[os] value=[[1]] 1 package reduce start key=[bufio] value=[[1]] reduce end reduce start key=[fmt] value=[[1]] 1 os reduce end 1 bufio 1 fmt reduce end reduce start key=[import] value=[[1 1 1]] reduce end reduce start key=[main] value=[[1]] reduce end 3 import 1 main complete $