RubyでMapReduceのフレームワークを作ってみた

MapReduceシリーズの第5弾。第4弾まではこちら → Scala / Erlang / Go / F#

The Omnibus Concurrency Libraryというのを使ってみた。Scalaのアクターと同じ感じ。

require 'concurrent/actors'
include Concurrent::Actors

KeyValue = Case.new :key, :value
Exit = Case.new :reply_to
Complete = Case.new :actor, :result

def log *args
  puts *args
end

class MapReduce

  # 繰り返し処理をするアクターを作る
  def loop_receive acc0 = nil, &block
    Actor.spawn do
      acc = acc0
      loop do
        Actor.receive do |f|
          f.when KeyValue do |kv|
            acc = block.call acc, kv
          end
          f.when Exit do |e|
            e.reply_to << Complete[Actor.current, acc]
            break
          end
        end
      end
    end
  end

  # Map処理およびReduce処理
  def process_parallel process_name, input, output_actor
    input.collect do |kv|
      Thread.new do
        log "#{process_name} start key=#{kv.key.inspect} value=#{kv.value.inspect}"
        result = method(process_name)[kv.key, kv.value]
        result.each {|kv| output_actor << kv }
        log "#{process_name} end"
      end
    end
  end

  # アクターが完了するまで待つ
  def wait actor
    actor << Exit[Actor.current]
    Actor.receive do |f|
      f.when Complete[actor, Object], &:result
    end
  end

  # MapReduce実行
  def execute
    # Shuffle処理を行うアクター
    shuffling_hash = Hash.new {|hash, k| hash[k] = [] }
    shuffle_actor = loop_receive shuffling_hash do |hash, kv|
      hash[kv.key] << kv.value
      hash
    end

    # Map処理を行う
    map_threads = process_parallel :map, read, shuffle_actor
    # Map処理が全て完了するまで待つ
    map_threads.each &:join
    # Shuffle処理が完了するまで待つ
    shuffled_hash = wait shuffle_actor
    log 'shuffled'

    # 出力を行うアクター
    write_actor = loop_receive {|_, kv| write kv.key, kv.value }

    # Reduce処理を行う
    shuffled_kv = shuffled_hash.collect {|k, v| KeyValue[k, v] }
    reduce_threads = process_parallel :reduce, shuffled_kv, write_actor
    # Reduce処理が全て完了するまで待つ
    reduce_threads.each &:join
    # 出力が完了するまで待つ
    wait write_actor
    log 'completed'
  end
end

使用例はこちら。executeメソッドに引数として4つのProcオブジェクトを渡す形にしなかったのは、lambda {…} と繰り返し書くことで見づらくなるのを避けるため。

class << map_reduce = MapReduce.new
  def read
    File.readlines('map_reduce.rb').first(5).collect {|line| KeyValue[nil, line.chomp] }
  end
  def map _, line
    words = line.split(/\W+/).select {|word| word != '' }
    words.collect {|word| KeyValue[word, 1] }
  end
  def reduce word, counts
    [KeyValue[word, counts.length]]
  end
  def write word, count
    printf "%5d\t%s\n", count, word
  end
end

map_reduce.execute

出力例

map start key=nil value="require 'concurrent/actors'"
map start key=nil value="include Concurrent::Actors"
map start key=nil value=""
map end
map end
map end
map start key=nil value="KeyValue = Case.new :key, :value"
map start key=nil value="Exit = Case.new :reply_to"
map end
map end
shuffled
reduce start key="new" value=[1, 1]
reduce start key="Exit" value=[1]
reduce start key="actors" value=[1]
reduce end
    2   new
reduce start key="Concurrent" value=[1]
reduce end
reduce start key="concurrent" value=[1]
reduce end
    1   Exit
reduce start key="value" value=[1]
reduce end
reduce start key="include" value=[1]
reduce end
    1   actors
reduce start key="reply_to" value=[1]
reduce end
reduce start key="Case" value=[1, 1]
reduce end
    1   Concurrent
reduce start key="require" value=[1]
reduce end
reduce start key="Actors" value=[1]
reduce end
    1   concurrent
reduce start key="key" value=[1]
reduce end
reduce start key="KeyValue" value=[1]
reduce end
    1   value
reduce end
    1   include
    1   reply_to
    2   Case
    1   require
    1   Actors
    1   key
    1   KeyValue
reduce end
completed