Go言語でMapReduceのフレームワークを作ってみた

前回Erlang前々回Scalaに続き、Goでも作ってみた。

Goのチャネルは、ScalaのアクターやErlangのプロセスが持っているメールボックスの仕組みと異なり、明示的に作る必要がある(make関数を使う)。また、未受信のメッセージをいくつまで溜めておけるようにするかを指定しなければならない。下記では、その数を1000にしている。

package main

import "bufio"
import "fmt"
import "os"
import "strings"

type KeyValue struct {
    Key interface{}
    Value interface{}
}

type MapReduce struct {
    Read func(chan<- *KeyValue)
    Map func(*KeyValue, chan<- *KeyValue)
    Reduce func(*KeyValue, chan<- *KeyValue)
    Write func(*KeyValue)

    shuffleMap map[interface{}] []interface{}
    inputCh chan *KeyValue
    inputExitCh chan interface{}
    mapExitCh chan interface{}
    shuffleCh chan *KeyValue
    shuffleExitCh chan interface{}
    reduceExitCh chan interface{}
    outputCh chan *KeyValue
    outputExitCh chan interface{}
}

func log(s string) {
    fmt.Printf("%s\n", s)
}

// MapReduce構造体の各フィールドを初期化するメソッド
func (this *MapReduce) init() {
    this.shuffleMap = make(map[interface{}] []interface{})
    this.inputCh = make(chan *KeyValue, 1000)           // ReadはhandleInputを待たない
    this.inputExitCh = make(chan interface{})
    this.mapExitCh = make(chan interface{})
    this.shuffleCh = make(chan *KeyValue, 1000)         // Mapはshuffleを待たない
    this.shuffleExitCh = make(chan interface{})
    this.reduceExitCh = make(chan interface{})
    this.outputCh = make(chan *KeyValue, 1000)          // ReduceはhandleOutputを待たない
    this.outputExitCh = make(chan interface{})
}

// KeyValueを繰り返し受け取って処理するメソッド
func (this *MapReduce) loopReceive(requestCh <-chan *KeyValue, exitCh chan<- interface{},
        acc0 interface{}, process func(acc interface{}, kv *KeyValue) interface{}) {

    acc := acc0
    for {
        kv := <- requestCh
        if kv == nil {
            close(requestCh)
            exitCh <- acc
            return
        }
        acc = process(acc, kv)
    }
}

// Shuffle処理を行うメソッド
func (this *MapReduce) shuffle(kv *KeyValue) {
    array, ok := this.shuffleMap[kv.Key]
    if ok {
        // 元の配列の末尾に kv.Value を追加した新しい配列を作る
        newArray := make([]interface{}, len(array) + 1)
        for i, v := range(array) { newArray[i] = v }
        newArray[len(array)] = kv.Value
        this.shuffleMap[kv.Key] = newArray
    } else {
        this.shuffleMap[kv.Key] = []interface{}{kv.Value}
    }
}

// Map処理およびReduce処理のコア部分
func (this *MapReduce) processParallel(processName string, kv *KeyValue,
        process func(*KeyValue, chan<- *KeyValue), outputCh chan<- *KeyValue, exitCh chan<- interface{}) {

    log(fmt.Sprintf("%s start key=[%v] value=[%v]", processName, kv.Key, kv.Value))
    process(kv, outputCh)
    log(fmt.Sprintf("%s end", processName))
    exitCh <- nil
}

// 複数のゴルーチンが全て完了するまで待つ
func (this *MapReduce) waitAll(ch <-chan interface{}, count int) {
    for i := 0; i < count; i++ {
        <- ch
    }
}

// MapReduce実行
func (this *MapReduce) Execute() {
    if this.shuffleMap != nil { panic(this) }
    this.init()

    // Shuffle処理を行うゴルーチンを起動する
    go this.loopReceive(this.shuffleCh, this.shuffleExitCh, nil, func(_ interface{}, kv *KeyValue) interface{} {
        this.shuffle(kv)
        return nil
    })
    // 入力データを受け取ってMap処理を起動するゴルーチンを起動する
    go this.loopReceive(this.inputCh, this.inputExitCh, 0, func(mapCount interface{}, kv *KeyValue) interface{} {
        go this.processParallel("map", kv, this.Map, this.shuffleCh, this.mapExitCh)
        return mapCount.(int) + 1
    })
    // 入力データの読み取りを行う
    this.Read(this.inputCh)
    // Map処理を起動するゴルーチンが完了するまで待つ
    this.inputCh <- nil
    mapCount := <- this.inputExitCh                     // Map処理を起動した回数
    // Map処理が全て完了するまで待つ
    this.waitAll(this.mapExitCh, mapCount.(int))
    // Shuffle処理が完了するまで待つ
    this.shuffleCh <- nil
    <- this.shuffleExitCh
    log("shuffled")

    // 出力データを受け取るゴルーチンを起動する
    go this.loopReceive(this.outputCh, this.outputExitCh, nil, func(_ interface{}, kv *KeyValue) interface{} {
        this.Write(kv)
        return nil
    })
    // Reduce処理を行うゴルーチンを起動する
    for k, v := range(this.shuffleMap) {
        go this.processParallel("reduce", &KeyValue{k, v}, this.Reduce, this.outputCh, this.reduceExitCh)
    }
    // Reduce処理が全て完了するまで待つ
    this.waitAll(this.reduceExitCh, len(this.shuffleMap))
    // handleOutputが完了するまで待つ
    this.outputCh <- nil
    <- this.outputExitCh
    log("complete")
}

ここまでがフレームワーク部分。次に、使用例。

func main() {
    inFile, err := os.Open("mapReduce.go", os.O_RDONLY, 0)
    if err != nil { panic(err) }
    defer inFile.Close()
    reader := bufio.NewReader(inFile)

    mapReduce := MapReduce{
        Read: func(inputCh chan<- *KeyValue) {
            lineNumber := 0
            for {
                line, err := reader.ReadString('\n')                    // 1行読み出す
                if err == os.EOF { break }
                if err != nil { panic(err) }
                lineNumber++
                if lineNumber > 5 { break }                             // 5行を超えたら終了
                inputCh <- &KeyValue{nil, line}
            }
        },
        Map: func(kv *KeyValue, shuffleCh chan<- *KeyValue) {
            line := kv.Value.(string)
            words := strings.FieldsFunc(line, func(c int) bool {        // 英数字以外で区切る
                switch {
                case '0' <= c && c <= '9': return false
                case 'A' <= c && c <= 'Z': return false
                case 'a' <= c && c <= 'z': return false
                }
                return true
            })
            for _, word := range(words) {
                shuffleCh <- &KeyValue{word, 1}
            }
        },
        Reduce: func(kv *KeyValue, outputCh chan<- *KeyValue) {
            counts := kv.Value.([]interface{})
            outputCh <- &KeyValue{kv.Key, len(counts)}
        },
        Write: func(kv *KeyValue) {
            fmt.Printf("%5d\t%s\n", kv.Value, kv.Key)
        },
    }
    mapReduce.Execute()
}

その出力例。

$ 8.out
map start key=[<nil>] value=[
]
map end
map start key=[<nil>] value=[package main
]
map end
map start key=[<nil>] value=[import "bufio"
]
map end
map start key=[<nil>] value=[import "fmt"
]
map end
map start key=[<nil>] value=[import "os"
]
map end
shuffled
reduce start key=[package] value=[[1]]
reduce end
reduce start key=[os] value=[[1]]
    1   package
reduce start key=[bufio] value=[[1]]
reduce end
reduce start key=[fmt] value=[[1]]
    1   os
reduce end
    1   bufio
    1   fmt
reduce end
reduce start key=[import] value=[[1 1 1]]
reduce end
reduce start key=[main] value=[[1]]
reduce end
    3   import
    1   main
complete
$