ErlangでMapReduceのフレームワークを作ってみた

昨日の記事でやった内容のErlangバージョン。

-module(mapReduce).
-export([execute/4]).
-export([test/0]).

%% 繰り返し処理をする関数
loop_receive(MonitorPid, Acc, Fun) ->
    receive
        {K, V} -> loop_receive(MonitorPid, Fun(Acc, K, V), Fun);
        exit -> MonitorPid ! {complete, self(), Acc}
    end.

%% MapおよびReduce処理のプロセスを起動する関数を返す関数
process_parallel(MonitorPid, ProcessName, Process, OutputPid) ->
    fun({K1, V1}) ->
        spawn(fun() ->
                  io:format("~s start key=~p value=~p~n", [ProcessName, K1, V1]),
                  OutputFun = fun({K2, V2}) -> OutputPid ! {K2, V2} end,
                  lists:foreach(OutputFun, Process(K1, V1)),
                  io:format("~s end~n", [ProcessName]),
                  MonitorPid ! {complete, self()}
              end)
    end.

%% プロセスが完了するまで待つ関数
wait_complete(Pid) ->
    Pid ! exit,
    receive
        {complete, Pid, Result} -> void
    end,
    Result.

%% プロセスが全て完了するまで待つ関数
wait_all_complete(Pids) ->
    lists:foreach(fun(Pid) ->
                      receive
                          {complete, Pid} -> void
                      end
                  end, Pids).

%% MapReduce
execute(Read, Map, Reduce, Write) ->
    MonitorPid = self(),
    
    %% Map処理の結果を受け取るプロセスを起動する
    ShuffleFun = fun(ShufflingDict, K, V) ->
                     dict:update(K, fun(L) -> [V|L] end, [V], ShufflingDict)
                 end,
    ShufflePid = spawn(fun() -> loop_receive(MonitorPid, dict:new(), ShuffleFun) end),
    
    %% Map処理を行う
    StartMapProcess = process_parallel(MonitorPid, "map", Map, ShufflePid),
    wait_all_complete(Read(StartMapProcess)),
    %% Shuffle処理が完了するまで待つ
    ShuffledDict = wait_complete(ShufflePid),
    ShuffledList = dict:to_list(ShuffledDict),
    io:format("shuffled~n", []),
    
    %% Reduce処理の結果を受け取って出力を行うプロセスを起動する
    WriteFun = fun(_, K, V) -> Write(K, V) end,
    WritePid = spawn(fun() -> loop_receive(MonitorPid, void, WriteFun) end),
    
    %% Reduce処理を行う
    StartReduce = process_parallel(MonitorPid, "reduce", Reduce, WritePid),
    wait_all_complete(lists:map(StartReduce, ShuffledList)),
    %% 出力が全て完了するまで待つ
    wait_complete(WritePid),
    io:format("completed~n", []).

使用例も昨日のと同じ内容。

%% ファイルInからLimit行だけ読み出してStartMapProcess関数に渡す関数
read_write(In, Limit, StartMapProcess, Pids) ->
    case Limit of
        0 -> Pids;
        _ -> case io:get_line(In, "") of
                 eof -> Pids;
                 Line -> Pid = StartMapProcess({void, Line}),
                         read_write(In, Limit - 1, StartMapProcess, [Pid|Pids])
             end
    end.

test() ->
    {ok, In} = file:open("MapReduce.scala", read),
    try
        execute(
            %% Read: => {void, 行}
            fun(StartMapProcess) -> read_write(In, 5, StartMapProcess, []) end,
            %% Map: {void, 行} => {単語, 1}×単語数
            fun(_, Line) ->
                Words = string:tokens(Line, "\n\r\t !\"#$%&\'()*+,-./:;<=>?@[\\]^`{|}~"),
                lists:map(fun(Word) -> {Word, 1} end, Words)
            end,
            %% Reduce: (単語, [1]×出現回数) => {単語, 出現回数}×1
            fun(Word, Counts) -> [{Word, length(Counts)}] end,
            %% Write: (単語, 出現回数) => ok
            fun(Word, Count) -> io:format("~5b\t~s~n", [Count, Word]) end
        )
    after
        file:close(In)
    end.

出力例はこうなる。

1> c(mapReduce).
{ok,mapReduce}
2> mapReduce:test().
map start key=void value="import scala.actors.Actor\n"
map end
map start key=void value="import scala.actors.Actor._\n"
map start key=void value="import scala.actors.Exit\n"
map start key=void value="import scala.actors.Futures\n"
map start key=void value="\n"
map end
map end
map end
map end
shuffled
reduce start key="_" value=[1]
reduce start key="scala" value=[1,1,1,1]
reduce start key="Actor" value=[1,1]
reduce start key="Futures" value=[1]
reduce start key="import" value=[1,1,1,1]
reduce start key="Exit" value=[1]
reduce start key="actors" value=[1,1,1,1]
reduce end
    1   _
reduce end
reduce end
reduce end
reduce end
reduce end
reduce end
    4   scala
    2   Actor
    1   Futures
    4   import
    1   Exit
    4   actors
completed
ok
3>