F#でMapReduceのフレームワークを作ってみた

MapReduceシリーズの第4弾。第3弾まではこちら → Scala / Erlang / Go

フレームワークのソースは、これまでで一番短くなった。ポイントとして、ScalaのActorに似たMailboxProcessorというクラスを使っている。これについては、こちらの記事を参考にさせて頂いた。
ちなみに、はてなダイアリーがF#のシンタックスハイライトに対応していないので、代わりにOCamlのものを適用してある。そのため、「return」「member」などの語がハイライト表示になっていない。

open System
open System.Collections.Concurrent
open System.Collections.Generic
open System.Linq
open System.Text.RegularExpressions
open System.Threading
open System.Threading.Tasks

let log (s : string) = Console.WriteLine s

(* MailboxProcessorで受け取るメッセージの型 *)
type RequestMessage<'Request, 'Reply> = Request of 'Request
                                      | Exit of AsyncReplyChannel<'Reply>

type MapReduce() =
    (* 繰り返し処理をする、MailboxProcessorのコア部分 *)
    let loopReceive process_ createReply (agent : MailboxProcessor<_>) =
        let rec loop() = async {
                             let! msg = agent.Receive()
                             match msg with
                             | Request(key, value) -> process_ key value
                                                      return! loop()
                             | Exit replyChannel   -> replyChannel.Reply <| createReply()
                                                      return ()
                         }
        loop()

    (* Shuffle処理 *)
    let shuffle (agent : MailboxProcessor<_>) =
        let dict = ref Map.empty
        agent |> loopReceive
            (fun key value -> let values = if Map.containsKey key !dict then (!dict).[key] else []
                              dict := Map.add key (value :: values) !dict)
            (fun () -> seq { for kv in !dict -> kv.Key, kv.Value })

    (* Map処理およびReduce処理 *)
    let processParallel processName (input : seq<_>) process_ (outputAgent : MailboxProcessor<_>) =
        input.AsParallel().ForAll(
            fun (key, value) -> log <| sprintf "%s start key=%A value=%A" processName key value
                                for kv in process_ key value do outputAgent.Post <| Request kv
                                log <| sprintf "%s end" processName
        )
        outputAgent.PostAndReply Exit

    (* MapReduce *)
    member this.Execute read map reduce write =
        (* Shuffle処理を行うMailboxProcessorを起動する *)
        let shuffleAgent = MailboxProcessor.Start shuffle

        (* データを読み込み、Map処理を起動して完了するまで待つ *)
        let shuffledSeq = processParallel "map" read map shuffleAgent
        log "shuffled"

        (* 出力を行うMailboxProcessorを起動する *)
        let writeAgent = MailboxProcessor.Start <| loopReceive write id

        (* Reduce処理を起動して完了するまで待つ *)
        processParallel "reduce" shuffledSeq reduce writeAgent
        log "completed"

続いて、使用例。題材は第3弾までと同じく、ファイルを行単位で読み込んで単語ごとの出現回数を数えるもの。まあ、MapReduceの解説記事でよく取り上げられるやつだ。

    let mapReduce = new MapReduce()
    mapReduce.Execute
        (*** read ***)
        (seq { for line in System.IO.File.ReadLines "MapReduce.scala" -> null, line } |> Seq.take 5)
        (*** map ***)
        (fun _ line -> seq { for m in Regex.Matches(line, @"\w+") -> m.Value, 1 })
        (*** reduce ***)
        (fun word counts -> seq { yield word, Seq.sum counts })
        (*** write ***)
        (fun word count -> sprintf "%5d\t%s" count word |> Console.WriteLine)

その出力例。

map start key=<null> value="import scala.actors.Actor"
map start key=<null> value="import scala.actors.Exit"
map start key=<null> value="import scala.actors.Futures"
map start key=<null> value=""
map start key=<null> value="import scala.actors.Actor._"
map end
map end
map end
map end
map end
shuffled
reduce start key="_" value=[1]
reduce start key="Futures" value=[1]
reduce start key="Actor" value=[1; 1]
reduce end
reduce end
reduce end
reduce start key="import" value=[1; 1; 1; 1]
    1   _
reduce end
    1   Futures
reduce start key="actors" value=[1; 1; 1; 1]
    2   Actor
reduce end
    4   import
    4   actors
reduce start key="scala" value=[1; 1; 1; 1]
reduce end
    4   scala
reduce start key="Exit" value=[1]
reduce end
    1   Exit
completed