RubyでMapReduceのフレームワークを作ってみた
MapReduceシリーズの第5弾。第4弾まではこちら → Scala / Erlang / Go / F#
The Omnibus Concurrency Libraryというのを使ってみた。Scalaのアクターと同じ感じ。
require 'concurrent/actors' include Concurrent::Actors KeyValue = Case.new :key, :value Exit = Case.new :reply_to Complete = Case.new :actor, :result def log *args puts *args end class MapReduce # 繰り返し処理をするアクターを作る def loop_receive acc0 = nil, &block Actor.spawn do acc = acc0 loop do Actor.receive do |f| f.when KeyValue do |kv| acc = block.call acc, kv end f.when Exit do |e| e.reply_to << Complete[Actor.current, acc] break end end end end end # Map処理およびReduce処理 def process_parallel process_name, input, output_actor input.collect do |kv| Thread.new do log "#{process_name} start key=#{kv.key.inspect} value=#{kv.value.inspect}" result = method(process_name)[kv.key, kv.value] result.each {|kv| output_actor << kv } log "#{process_name} end" end end end # アクターが完了するまで待つ def wait actor actor << Exit[Actor.current] Actor.receive do |f| f.when Complete[actor, Object], &:result end end # MapReduce実行 def execute # Shuffle処理を行うアクター shuffling_hash = Hash.new {|hash, k| hash[k] = [] } shuffle_actor = loop_receive shuffling_hash do |hash, kv| hash[kv.key] << kv.value hash end # Map処理を行う map_threads = process_parallel :map, read, shuffle_actor # Map処理が全て完了するまで待つ map_threads.each &:join # Shuffle処理が完了するまで待つ shuffled_hash = wait shuffle_actor log 'shuffled' # 出力を行うアクター write_actor = loop_receive {|_, kv| write kv.key, kv.value } # Reduce処理を行う shuffled_kv = shuffled_hash.collect {|k, v| KeyValue[k, v] } reduce_threads = process_parallel :reduce, shuffled_kv, write_actor # Reduce処理が全て完了するまで待つ reduce_threads.each &:join # 出力が完了するまで待つ wait write_actor log 'completed' end end
使用例はこちら。executeメソッドに引数として4つのProcオブジェクトを渡す形にしなかったのは、lambda {…} と繰り返し書くことで見づらくなるのを避けるため。
class << map_reduce = MapReduce.new def read File.readlines('map_reduce.rb').first(5).collect {|line| KeyValue[nil, line.chomp] } end def map _, line words = line.split(/\W+/).select {|word| word != '' } words.collect {|word| KeyValue[word, 1] } end def reduce word, counts [KeyValue[word, counts.length]] end def write word, count printf "%5d\t%s\n", count, word end end map_reduce.execute
出力例
map start key=nil value="require 'concurrent/actors'" map start key=nil value="include Concurrent::Actors" map start key=nil value="" map end map end map end map start key=nil value="KeyValue = Case.new :key, :value" map start key=nil value="Exit = Case.new :reply_to" map end map end shuffled reduce start key="new" value=[1, 1] reduce start key="Exit" value=[1] reduce start key="actors" value=[1] reduce end 2 new reduce start key="Concurrent" value=[1] reduce end reduce start key="concurrent" value=[1] reduce end 1 Exit reduce start key="value" value=[1] reduce end reduce start key="include" value=[1] reduce end 1 actors reduce start key="reply_to" value=[1] reduce end reduce start key="Case" value=[1, 1] reduce end 1 Concurrent reduce start key="require" value=[1] reduce end reduce start key="Actors" value=[1] reduce end 1 concurrent reduce start key="key" value=[1] reduce end reduce start key="KeyValue" value=[1] reduce end 1 value reduce end 1 include 1 reply_to 2 Case 1 require 1 Actors 1 key 1 KeyValue reduce end completed