C#でMapReduceのフレームワーク?を作ってみた
3年ぶりに、MapReduceシリーズの第6弾。第5弾まではこちら → Scala / Erlang / Go / F# / Ruby
C# 4.0で作ってみたが、とてもシンプルになった。LINQとTPL(タスク並列ライブラリ)のおかげか。
using System; using System.Collections.Generic; using System.Linq; using System.Threading; public class MapReduce { public static void Execute<K1, V1, K2, V2, K3, V3>( IEnumerable<Tuple<K1, V1>> read, Func<Tuple<K1, V1>, IEnumerable<Tuple<K2, V2>>> map, Func<Tuple<K2, IEnumerable<V2>>, IEnumerable<Tuple<K3, V3>>> reduce, Action<Tuple<K3, V3>> write) { var mapped = ProcessParallel("map", read, map); var shuffled = from grp in (from t in mapped group t.Item2 by t.Item1) select Tuple.Create(grp.Key, grp.AsEnumerable()); var reduced = ProcessParallel("reduce", shuffled, reduce); foreach (var t in reduced) { write(t); } } private static ParallelQuery<Tuple<K2, V2>> ProcessParallel<K1, V1, K2, V2>( string processName, IEnumerable<Tuple<K1, V1>> input, Func<Tuple<K1, V1>, IEnumerable<Tuple<K2, V2>>> process) { return input.AsParallel().SelectMany(t => { Console.WriteLine("{0} start key=[{1}] value=[{2}]", processName, t.Item1, t.Item2); try { return process(t); } finally { Console.WriteLine("{0} end", processName); } }); } }
使用例はこちら。
var read = from line in File.ReadLines("MapReduce.scala").Take(5) select Tuple.Create<object, string>(null, line); Func<Tuple<object, string>, IEnumerable<Tuple<string, int>>> map = t => from Match m in Regex.Matches(t.Item2, @"\w+") select Tuple.Create(m.Value, 1); Func<Tuple<string, IEnumerable<int>>, IEnumerable<Tuple<string, int>>> reduce = t => Enumerable.Repeat(Tuple.Create(t.Item1, t.Item2.Count()), 1); Action<Tuple<string, int>> write = t => Console.WriteLine("{0,5}\t{1}", t.Item2, t.Item1); MapReduce.Execute(read, map, reduce, write);
出力例はこちら。
map start key=[] value=[import scala.actors.Actor] map end map start key=[] value=[import scala.actors.Actor._] map start key=[] value=[import scala.actors.Exit] map end map start key=[] value=[import scala.actors.Futures] map end map start key=[] value=[] map end map end reduce start key=[Exit] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]] reduce start key=[actors] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]] reduce start key=[Futures] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]] reduce start key=[_] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]] reduce start key=[Actor] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]] reduce start key=[scala] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]] reduce start key=[import] value=[System.Linq.Parallel.GroupByGrouping`2[System.String,System.Int32]] reduce end reduce end reduce end reduce end reduce end 1 Futures 4 actors 1 Exit 2 Actor 4 scala reduce end 1 _ reduce end 4 import