F#でMapReduceのフレームワークを作ってみた
MapReduceシリーズの第4弾。第3弾まではこちら → Scala / Erlang / Go
フレームワークのソースは、これまでで一番短くなった。ポイントとして、ScalaのActorに似たMailboxProcessorというクラスを使っている。これについては、こちらの記事を参考にさせて頂いた。
ちなみに、はてなダイアリーがF#のシンタックスハイライトに対応していないので、代わりにOCamlのものを適用してある。そのため、「return」「member」などの語がハイライト表示になっていない。
open System open System.Collections.Concurrent open System.Collections.Generic open System.Linq open System.Text.RegularExpressions open System.Threading open System.Threading.Tasks let log (s : string) = Console.WriteLine s (* MailboxProcessorで受け取るメッセージの型 *) type RequestMessage<'Request, 'Reply> = Request of 'Request | Exit of AsyncReplyChannel<'Reply> type MapReduce() = (* 繰り返し処理をする、MailboxProcessorのコア部分 *) let loopReceive process_ createReply (agent : MailboxProcessor<_>) = let rec loop() = async { let! msg = agent.Receive() match msg with | Request(key, value) -> process_ key value return! loop() | Exit replyChannel -> replyChannel.Reply <| createReply() return () } loop() (* Shuffle処理 *) let shuffle (agent : MailboxProcessor<_>) = let dict = ref Map.empty agent |> loopReceive (fun key value -> let values = if Map.containsKey key !dict then (!dict).[key] else [] dict := Map.add key (value :: values) !dict) (fun () -> seq { for kv in !dict -> kv.Key, kv.Value }) (* Map処理およびReduce処理 *) let processParallel processName (input : seq<_>) process_ (outputAgent : MailboxProcessor<_>) = input.AsParallel().ForAll( fun (key, value) -> log <| sprintf "%s start key=%A value=%A" processName key value for kv in process_ key value do outputAgent.Post <| Request kv log <| sprintf "%s end" processName ) outputAgent.PostAndReply Exit (* MapReduce *) member this.Execute read map reduce write = (* Shuffle処理を行うMailboxProcessorを起動する *) let shuffleAgent = MailboxProcessor.Start shuffle (* データを読み込み、Map処理を起動して完了するまで待つ *) let shuffledSeq = processParallel "map" read map shuffleAgent log "shuffled" (* 出力を行うMailboxProcessorを起動する *) let writeAgent = MailboxProcessor.Start <| loopReceive write id (* Reduce処理を起動して完了するまで待つ *) processParallel "reduce" shuffledSeq reduce writeAgent log "completed"
続いて、使用例。題材は第3弾までと同じく、ファイルを行単位で読み込んで単語ごとの出現回数を数えるもの。まあ、MapReduceの解説記事でよく取り上げられるやつだ。
let mapReduce = new MapReduce() mapReduce.Execute (*** read ***) (seq { for line in System.IO.File.ReadLines "MapReduce.scala" -> null, line } |> Seq.take 5) (*** map ***) (fun _ line -> seq { for m in Regex.Matches(line, @"\w+") -> m.Value, 1 }) (*** reduce ***) (fun word counts -> seq { yield word, Seq.sum counts }) (*** write ***) (fun word count -> sprintf "%5d\t%s" count word |> Console.WriteLine)
その出力例。
map start key=<null> value="import scala.actors.Actor" map start key=<null> value="import scala.actors.Exit" map start key=<null> value="import scala.actors.Futures" map start key=<null> value="" map start key=<null> value="import scala.actors.Actor._" map end map end map end map end map end shuffled reduce start key="_" value=[1] reduce start key="Futures" value=[1] reduce start key="Actor" value=[1; 1] reduce end reduce end reduce end reduce start key="import" value=[1; 1; 1; 1] 1 _ reduce end 1 Futures reduce start key="actors" value=[1; 1; 1; 1] 2 Actor reduce end 4 import 4 actors reduce start key="scala" value=[1; 1; 1; 1] reduce end 4 scala reduce start key="Exit" value=[1] reduce end 1 Exit completed