ErlangでMapReduceのフレームワークを作ってみた
-module(mapReduce). -export([execute/4]). -export([test/0]). %% 繰り返し処理をする関数 loop_receive(MonitorPid, Acc, Fun) -> receive {K, V} -> loop_receive(MonitorPid, Fun(Acc, K, V), Fun); exit -> MonitorPid ! {complete, self(), Acc} end. %% MapおよびReduce処理のプロセスを起動する関数を返す関数 process_parallel(MonitorPid, ProcessName, Process, OutputPid) -> fun({K1, V1}) -> spawn(fun() -> io:format("~s start key=~p value=~p~n", [ProcessName, K1, V1]), OutputFun = fun({K2, V2}) -> OutputPid ! {K2, V2} end, lists:foreach(OutputFun, Process(K1, V1)), io:format("~s end~n", [ProcessName]), MonitorPid ! {complete, self()} end) end. %% プロセスが完了するまで待つ関数 wait_complete(Pid) -> Pid ! exit, receive {complete, Pid, Result} -> void end, Result. %% プロセスが全て完了するまで待つ関数 wait_all_complete(Pids) -> lists:foreach(fun(Pid) -> receive {complete, Pid} -> void end end, Pids). %% MapReduce execute(Read, Map, Reduce, Write) -> MonitorPid = self(), %% Map処理の結果を受け取るプロセスを起動する ShuffleFun = fun(ShufflingDict, K, V) -> dict:update(K, fun(L) -> [V|L] end, [V], ShufflingDict) end, ShufflePid = spawn(fun() -> loop_receive(MonitorPid, dict:new(), ShuffleFun) end), %% Map処理を行う StartMapProcess = process_parallel(MonitorPid, "map", Map, ShufflePid), wait_all_complete(Read(StartMapProcess)), %% Shuffle処理が完了するまで待つ ShuffledDict = wait_complete(ShufflePid), ShuffledList = dict:to_list(ShuffledDict), io:format("shuffled~n", []), %% Reduce処理の結果を受け取って出力を行うプロセスを起動する WriteFun = fun(_, K, V) -> Write(K, V) end, WritePid = spawn(fun() -> loop_receive(MonitorPid, void, WriteFun) end), %% Reduce処理を行う StartReduce = process_parallel(MonitorPid, "reduce", Reduce, WritePid), wait_all_complete(lists:map(StartReduce, ShuffledList)), %% 出力が全て完了するまで待つ wait_complete(WritePid), io:format("completed~n", []).
使用例も昨日のと同じ内容。
%% ファイルInからLimit行だけ読み出してStartMapProcess関数に渡す関数 read_write(In, Limit, StartMapProcess, Pids) -> case Limit of 0 -> Pids; _ -> case io:get_line(In, "") of eof -> Pids; Line -> Pid = StartMapProcess({void, Line}), read_write(In, Limit - 1, StartMapProcess, [Pid|Pids]) end end. test() -> {ok, In} = file:open("MapReduce.scala", read), try execute( %% Read: => {void, 行} fun(StartMapProcess) -> read_write(In, 5, StartMapProcess, []) end, %% Map: {void, 行} => {単語, 1}×単語数 fun(_, Line) -> Words = string:tokens(Line, "\n\r\t !\"#$%&\'()*+,-./:;<=>?@[\\]^`{|}~"), lists:map(fun(Word) -> {Word, 1} end, Words) end, %% Reduce: (単語, [1]×出現回数) => {単語, 出現回数}×1 fun(Word, Counts) -> [{Word, length(Counts)}] end, %% Write: (単語, 出現回数) => ok fun(Word, Count) -> io:format("~5b\t~s~n", [Count, Word]) end ) after file:close(In) end.
出力例はこうなる。
1> c(mapReduce). {ok,mapReduce} 2> mapReduce:test(). map start key=void value="import scala.actors.Actor\n" map end map start key=void value="import scala.actors.Actor._\n" map start key=void value="import scala.actors.Exit\n" map start key=void value="import scala.actors.Futures\n" map start key=void value="\n" map end map end map end map end shuffled reduce start key="_" value=[1] reduce start key="scala" value=[1,1,1,1] reduce start key="Actor" value=[1,1] reduce start key="Futures" value=[1] reduce start key="import" value=[1,1,1,1] reduce start key="Exit" value=[1] reduce start key="actors" value=[1,1,1,1] reduce end 1 _ reduce end reduce end reduce end reduce end reduce end reduce end 4 scala 2 Actor 1 Futures 4 import 1 Exit 4 actors completed ok 3>