ScalaのActorを使ってMapReduceのフレームワークを作ってみた

Actorを使う練習として、簡単なMapReduceフレームワークを作ってみた。練習用なので、シングルプロセスのみ。また、Sort処理は省いてある。

import scala.actors.Actor
import scala.actors.Actor._
import scala.actors.Exit
import scala.actors.Futures

object MapReduce {
    
    /** データを読み込むクラスです。*/
    abstract class Reader[K, V] {
        def init  { log("Reader#init")  }
        def close { log("Reader#close") }
        def read: Iterable[(K, V)]
    }
    
    /** 関数からReaderへの暗黙変換 */
    implicit def funcToReader[K, V](func: => Iterable[(K, V)]) = new Reader[K, V] {
        override def read = func
    }
    
    /** データを出力するクラスです。*/
    abstract class Writer[K, V] {
        def init  { log("Writer#init")  }
        def close { log("Writer#close") }
        def write(key: K, value: V)
    }
    
    /** 関数からWriterへの暗黙変換 */
    implicit def funcToWriter[K, V](func: (K, V) => Unit) = new Writer[K, V] {
        override def write(k: K, v: V) = func(k, v)
    }
    
    /** Shuffle処理を行うアクタークラスです。*/
    class ShuffleActor[K, V] extends Actor {
        import scala.collection.mutable.ListBuffer
        import scala.collection.mutable.Map
        private val shufflingMap = Map.empty[K, ListBuffer[V]]
        override def act = loopAct[K, V] { (k, v) =>
            if (! shufflingMap.contains(k)) shufflingMap(k) = new ListBuffer[V]
            shufflingMap(k) += v
        }
        def shuffledMap = shufflingMap.mapValues(_.toList)
    }
    
    /** ログ出力を行います。 */
    private def log(format: String, args: Any*) = printf(format + "%n", args: _*)
    
    /** 繰り返し処理をするアクターを作ります。*/
    private def loopAct[K, V](func: (K, V) => Unit) = loop {
        react {
            case (k: K, v: V) => func(k, v)
            case Exit => reply; exit
        }
    }
    
    /** Map処理およびReduce処理のコア部分です。*/
    private def processParallel[K1, V1, K2, V2](
        processName: String,
        input: Iterable[(K1, V1)],
        process: (K1, V1) => Iterable[(K2, V2)],
        outputActor: Actor
    ) = {
        // process を並列で実行し、出力を outputActor に渡す
        for ((k1, v1) <- input) yield Futures.future {
            log("%s start key=[%s] value=[%s]", processName, k1, v1)
            process(k1, v1).foreach(outputActor ! _)
            log("%s end", processName)
        }
    }
    
    /**
     * MapReduce で処理を行います。
     * @param reader データを読み込むオブジェクト
     * @param map Map関数
     * @param reduce Reduce関数
     * @param writer データを出力するオブジェクト
     */
    def execute[K1, V1, K2, V2, K3, V3](
        reader: => Reader[K1, V1],
        map: (K1, V1) => Iterable[(K2, V2)],
        reduce: (K2, Iterable[V2]) => Iterable[(K3, V3)],
        writer: Writer[K3, V3]
    ) {
        // Map処理の結果を受け取ってMapに格納するアクター
        val shuffleActor = new ShuffleActor[K2, V2]
        shuffleActor.start
        // Map処理を行う
        reader.init
        val mapFutures = try {
            processParallel("map", reader.read, map, shuffleActor)
        } finally {
            reader.close
        }
        mapFutures.foreach(_())                         // 全てのMapアクターが処理を終えるまで待つ
        shuffleActor !? Exit                            // shuffleActor が処理を終えるまで待つ
        val shuffledMap = shuffleActor.shuffledMap      // 完成したMapを取り出す
        log("shuffled")
        
        // Reduce処理の結果を受け取って出力を行うアクター
        val writeActor = actor(loopAct(writer.write))
        // Reduce処理を行う
        writer.init
        try {
            val reduceFutures = processParallel("reduce", shuffledMap, reduce, writeActor)
            reduceFutures.foreach(_())                  // 全てのReduceアクターが処理を終えるまで待つ
            writeActor !? Exit                          // writeActor が処理を終えるまで待つ
        } finally {
            writer.close
        }
        log("completed")
    }
}

Map処理、Reduce処理はそれぞれ並列で走らせるためにアクターを使っている(Futures.future による)。また、Shuffle処理は1つのアクター(shuffleActor)で担い、各Mapアクターから処理結果のメッセージを受け取る。同様に、最後の出力の処理も1つのアクター(writeActor)で担い、各Reduceアクターから結果のメッセージを受け取る。

使用例として、テキストファイル内の英単語を数えてみる。

// 入力データは、上のソースの初めの5行
val src = scala.io.Source.fromFile("MapReduce.scala", "UTF-8")
val lines = try { src.getLines.take(5).toList } finally { src.close }

MapReduce.execute[Null, String, String, Int, String, Int](
    // reader: => (null, 行)×行数
    lines.map((null, _)),
    // map: (null, 行) => (単語, 1)×単語数
    (_, line) => line.split("""\W+""").filter(_.length > 0).map(_ -> 1),
    // reduce: (単語, (1)×出現回数) => (単語, 出現回数)×1
    (word, counts) => List(word -> counts.size),
    // writer: (単語, 出現回数) => Unit
    (word: String, count: Int) => printf("%5d\t%s%n", count, word)
)

出力例はこちら。行頭がスペースになっているのが、writerによる出力の行。Reduceが全部終わる前にwriterが動き始めるようにしているため、writerによる出力と「reduce」のログ出力とが混ざっている。

warning: there were unchecked warnings; re-run with -unchecked for details
one warning found
Reader#init
Reader#close
map start key=[null] value=[]
map start key=[null] value=[import scala.actors.Futures]
map start key=[null] value=[import scala.actors.Actor]
map start key=[null] value=[import scala.actors.Exit]
map start key=[null] value=[import scala.actors.Actor._]
map end
map end
map end
map end
map end
shuffled
Writer#init
reduce start key=[Futures] value=[List(1)]
reduce start key=[import] value=[List(1, 1, 1, 1)]
    1   Futures
reduce start key=[_] value=[List(1)]
    4   import
reduce start key=[scala] value=[List(1, 1, 1, 1)]
reduce start key=[Actor] value=[List(1, 1)]
reduce end
reduce end
reduce start key=[Exit] value=[List(1)]
reduce start key=[actors] value=[List(1, 1, 1, 1)]
reduce end
    1   _
reduce end
reduce end
reduce end
    4   scala
reduce end
    2   Actor
    1   Exit
    4   actors
Writer#close
completed