C#でMapReduceのフレームワーク?を作ってみた

3年ぶりに、MapReduceシリーズの第6弾。第5弾まではこちら → Scala / Erlang / Go / F# / Ruby

C# 4.0で作ってみたが、とてもシンプルになった。LINQTPL(タスク並列ライブラリ)のおかげか。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

public class MapReduce {

    public static void Execute<K1, V1, K2, V2, K3, V3>(
            IEnumerable<Tuple<K1, V1>> read,
            Func<Tuple<K1, V1>, IEnumerable<Tuple<K2, V2>>> map,
            Func<Tuple<K2, IEnumerable<V2>>, IEnumerable<Tuple<K3, V3>>> reduce,
            Action<Tuple<K3, V3>> write) {

        var mapped = ProcessParallel("map", read, map);
        var shuffled = from grp in (from t in mapped group t.Item2 by t.Item1)
                       select Tuple.Create(grp.Key, grp.AsEnumerable());
        var reduced = ProcessParallel("reduce", shuffled, reduce);
        foreach (var t in reduced) {
            write(t);
        }
    }

    private static ParallelQuery<Tuple<K2, V2>> ProcessParallel<K1, V1, K2, V2>(
            string processName, 
            IEnumerable<Tuple<K1, V1>> input,
            Func<Tuple<K1, V1>, IEnumerable<Tuple<K2, V2>>> process) {

        return input.AsParallel().SelectMany(t => {
            Console.WriteLine("{0} start key=[{1}] value=[{2}]", processName, t.Item1, t.Item2);
            try {
                return process(t);
            } finally {
                Console.WriteLine("{0} end", processName);
            }
        });
    }
}

使用例はこちら。

var read = from line in File.ReadLines("MapReduce.scala").Take(5)
           select Tuple.Create<object, string>(null, line);
Func<Tuple<object, string>, IEnumerable<Tuple<string, int>>> map =
    t => from Match m in Regex.Matches(t.Item2, @"\w+")
         select Tuple.Create(m.Value, 1);
Func<Tuple<string, IEnumerable<int>>, IEnumerable<Tuple<string, int>>> reduce =
    t => Enumerable.Repeat(Tuple.Create(t.Item1, t.Item2.Count()), 1);
Action<Tuple<string, int>> write =
    t => Console.WriteLine("{0,5}\t{1}", t.Item2, t.Item1);
MapReduce.Execute(read, map, reduce, write);

出力例はこちら。

map start key=[] value=[import scala.actors.Actor]
map end
map start key=[] value=[import scala.actors.Actor._]
map start key=[] value=[import scala.actors.Exit]
map end
map start key=[] value=[import scala.actors.Futures]
map end
map start key=[] value=[]
map end
map end
reduce start key=[Exit] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]]
reduce start key=[actors] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]]
reduce start key=[Futures] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]]
reduce start key=[_] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]]
reduce start key=[Actor] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]]
reduce start key=[scala] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]]
reduce start key=[import] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]]
reduce end
reduce end
reduce end
reduce end
reduce end
    1   Futures
    4   actors
    1   Exit
    2   Actor
    4   scala
reduce end
    1   _
reduce end
    4   import