ScalaのActorを使ってMapReduceのフレームワークを作ってみた
Actorを使う練習として、簡単なMapReduceのフレームワークを作ってみた。練習用なので、シングルプロセスのみ。また、Sort処理は省いてある。
import scala.actors.Actor import scala.actors.Actor._ import scala.actors.Exit import scala.actors.Futures object MapReduce { /** データを読み込むクラスです。*/ abstract class Reader[K, V] { def init { log("Reader#init") } def close { log("Reader#close") } def read: Iterable[(K, V)] } /** 関数からReaderへの暗黙変換 */ implicit def funcToReader[K, V](func: => Iterable[(K, V)]) = new Reader[K, V] { override def read = func } /** データを出力するクラスです。*/ abstract class Writer[K, V] { def init { log("Writer#init") } def close { log("Writer#close") } def write(key: K, value: V) } /** 関数からWriterへの暗黙変換 */ implicit def funcToWriter[K, V](func: (K, V) => Unit) = new Writer[K, V] { override def write(k: K, v: V) = func(k, v) } /** Shuffle処理を行うアクタークラスです。*/ class ShuffleActor[K, V] extends Actor { import scala.collection.mutable.ListBuffer import scala.collection.mutable.Map private val shufflingMap = Map.empty[K, ListBuffer[V]] override def act = loopAct[K, V] { (k, v) => if (! shufflingMap.contains(k)) shufflingMap(k) = new ListBuffer[V] shufflingMap(k) += v } def shuffledMap = shufflingMap.mapValues(_.toList) } /** ログ出力を行います。 */ private def log(format: String, args: Any*) = printf(format + "%n", args: _*) /** 繰り返し処理をするアクターを作ります。*/ private def loopAct[K, V](func: (K, V) => Unit) = loop { react { case (k: K, v: V) => func(k, v) case Exit => reply; exit } } /** Map処理およびReduce処理のコア部分です。*/ private def processParallel[K1, V1, K2, V2]( processName: String, input: Iterable[(K1, V1)], process: (K1, V1) => Iterable[(K2, V2)], outputActor: Actor ) = { // process を並列で実行し、出力を outputActor に渡す for ((k1, v1) <- input) yield Futures.future { log("%s start key=[%s] value=[%s]", processName, k1, v1) process(k1, v1).foreach(outputActor ! _) log("%s end", processName) } } /** * MapReduce で処理を行います。 * @param reader データを読み込むオブジェクト * @param map Map関数 * @param reduce Reduce関数 * @param writer データを出力するオブジェクト */ def execute[K1, V1, K2, V2, K3, V3]( reader: => Reader[K1, V1], map: (K1, V1) => Iterable[(K2, V2)], reduce: (K2, Iterable[V2]) => Iterable[(K3, V3)], writer: Writer[K3, V3] ) { // Map処理の結果を受け取ってMapに格納するアクター val shuffleActor = new ShuffleActor[K2, V2] shuffleActor.start // Map処理を行う reader.init val mapFutures = try { processParallel("map", reader.read, map, shuffleActor) } finally { reader.close } mapFutures.foreach(_()) // 全てのMapアクターが処理を終えるまで待つ shuffleActor !? Exit // shuffleActor が処理を終えるまで待つ val shuffledMap = shuffleActor.shuffledMap // 完成したMapを取り出す log("shuffled") // Reduce処理の結果を受け取って出力を行うアクター val writeActor = actor(loopAct(writer.write)) // Reduce処理を行う writer.init try { val reduceFutures = processParallel("reduce", shuffledMap, reduce, writeActor) reduceFutures.foreach(_()) // 全てのReduceアクターが処理を終えるまで待つ writeActor !? Exit // writeActor が処理を終えるまで待つ } finally { writer.close } log("completed") } }
Map処理、Reduce処理はそれぞれ並列で走らせるためにアクターを使っている(Futures.future による)。また、Shuffle処理は1つのアクター(shuffleActor)で担い、各Mapアクターから処理結果のメッセージを受け取る。同様に、最後の出力の処理も1つのアクター(writeActor)で担い、各Reduceアクターから結果のメッセージを受け取る。
使用例として、テキストファイル内の英単語を数えてみる。
// 入力データは、上のソースの初めの5行 val src = scala.io.Source.fromFile("MapReduce.scala", "UTF-8") val lines = try { src.getLines.take(5).toList } finally { src.close } MapReduce.execute[Null, String, String, Int, String, Int]( // reader: => (null, 行)×行数 lines.map((null, _)), // map: (null, 行) => (単語, 1)×単語数 (_, line) => line.split("""\W+""").filter(_.length > 0).map(_ -> 1), // reduce: (単語, (1)×出現回数) => (単語, 出現回数)×1 (word, counts) => List(word -> counts.size), // writer: (単語, 出現回数) => Unit (word: String, count: Int) => printf("%5d\t%s%n", count, word) )
出力例はこちら。行頭がスペースになっているのが、writerによる出力の行。Reduceが全部終わる前にwriterが動き始めるようにしているため、writerによる出力と「reduce」のログ出力とが混ざっている。
warning: there were unchecked warnings; re-run with -unchecked for details one warning found Reader#init Reader#close map start key=[null] value=[] map start key=[null] value=[import scala.actors.Futures] map start key=[null] value=[import scala.actors.Actor] map start key=[null] value=[import scala.actors.Exit] map start key=[null] value=[import scala.actors.Actor._] map end map end map end map end map end shuffled Writer#init reduce start key=[Futures] value=[List(1)] reduce start key=[import] value=[List(1, 1, 1, 1)] 1 Futures reduce start key=[_] value=[List(1)] 4 import reduce start key=[scala] value=[List(1, 1, 1, 1)] reduce start key=[Actor] value=[List(1, 1)] reduce end reduce end reduce start key=[Exit] value=[List(1)] reduce start key=[actors] value=[List(1, 1, 1, 1)] reduce end 1 _ reduce end reduce end reduce end 4 scala reduce end 2 Actor 1 Exit 4 actors Writer#close completed