k-tokitoh

2019-07-13

Rubyのマルチスレッド

プロセスについて学んだのでついでにスレッドも触ってみる。

環境

% ruby -v
ruby 2.3.7p456 (2018-03-28 revision 63024) [x86_64-darwin18]

シングルスレッド

main スレッド内で順番に実行する。

p Time.now  # => 2019-06-30 12:35:46 +0900
sleep(3)
p Time.now  # => 2019-06-30 12:35:49 +0900

出力結果には 3 秒のズレが生じる。

Thread#new

スレッドが実行されるのは、Thread#new/fork/start で生成されたり、Thread#run, Thread#wakeup などが呼ばれたとき。

th = Thread.new do
  p Time.now  # => 2019-06-30 12:36:41 +0900
  sleep(3)
end

p Time.now  # => 2019-06-30 12:36:41 +0900

1 つ目の Time.now と sleep が子スレッドで、2 つ目の Time.now が main スレッドで、並行的に実行されるので出力結果は同時刻となる。

Thread#join

スレッドは基本的に互いに干渉せず実行されるが、Thread#join をつかうとレシーバのスレッドが終了するまで待つ。

th = Thread.new do
  p Time.now  # => 2019-06-30 12:50:05 +0900
  sleep(3)
end

th.join
p Time.now  # => 2019-06-30 12:50:08 +0900

子スレッドの終了を待ってから親スレッドのp Time.nowを実行するので、3 秒のズレが生じる。

メモリの共有

マルチスレッドではメモリを共有する(マルチプロセスでは原則としてメモリを共有しないのと対照される)。

hoge = 3

th = Thread.new do
  hoge = 10
end

th.join
p hoge  # => 10

Thread::Queue

積み上がったタスクを各スレッドがワーカーとして処理するために、Thread::Queue というクラスをつかえる。

q = Thread::Queue.new
[*1..100].each {|n| q.push(n)}

threads = [*1..5].map do |i|
  Thread.new do
    until q.empty?
      n = q.pop
      puts "square of #{n} is #{n**2} (thread #{i})\n"
      sleep(rand(0.01..0.1))
    end
  end
end

threads.each(&:join)

結果

square of 1 is 1 (thread 3)
square of 2 is 4 (thread 4)
square of 3 is 9 (thread 2)
square of 4 is 16 (thread 1)
square of 5 is 25 (thread 5)
square of 6 is 36 (thread 5)
square of 7 is 49 (thread 3)
square of 8 is 64 (thread 1)
...(以下略)

スレッド固有の変数

基本的にはメモリを共有するけど、「このスレッドだけでつかえる変数」がほしいときもある。

スレッド生成時に宣言する

hoge = 3

th = Thread.new do |hoge|
  hoge = 10
end

th.join
p hoge  # => 3

Thread#new にブロック引数を渡すと、そのスレッド固有の変数になる。

Thread#[] で操作する

th = Thread.new do
  sleep(1)
  p Thread.current[:hoge]  # => 10
end

th[:hoge] = 10

th.join

スレッドセーフティ

GVL

現在の実装では VM lock (GVL) を有しており、同時に実行されるネイティブスレッドは常にひとつです。 ただし、IO 関連のブロックする可能性があるシステムコールを行う場合には GVL を解放します。その場合にはスレッドは同時に実行され得ます。

https://docs.ruby-lang.org/ja/latest/doc/spec=2fthread.html

つまり、IO 関連のブロックする可能性があるシステムコールを…

確認していく。

IO 関連のブロックする可能性があるシステムコールを呼ばない場合、コードによりスレッドセーフティを担保する必要はない

以下のコードは明示的にスレッドセーフティを担保されてはいないが、GVL によって結果的にスレッドセーフティが実現している。

n = nil

threads = (1..5).map do |i|
  Thread.new do
    n = i
    100_000.times {}  # a time consuming line
    safe = n == i
  end
end

p threads.map(&:join).map(&:value)  # => [true, true, true, true, true]

IO 関連のブロックする可能性があるシステムコールを呼ぶ場合、コードによりスレッドセーフティを担保する必要がある

以下はスレッドセーフではない。

n = nil

threads = (1..5).map do |i|
  Thread.new do
    n = i
    puts ''
    safe = n == i
  end
end

p threads.map(&:join).map(&:value)  # => [false, false, false, true, false]

ちなみにn = in == iの間に記述された処理にかかる時間は以下のとおり。 データレースが起こる時間的な間隙はtimesの方がずっと長いことがわかる。 にも関わらずデータレースが発生しているのがputsの方だけなのは「前者では GVL がかかり/後者では GVL が解放されたから」と考えられる。1

require 'benchmark'

Benchmark.bm do |x|
  x.report(:times) { 100_000.times {} }
  x.report(:puts) { puts '' }
end
        user     system      total        real
times  0.010000   0.000000   0.010000 (  0.002932)
puts   0.000000   0.000000   0.000000 (  0.000006)

Thread::Mutex

ではどうやってコード上でスレッドセーフティを担保すればよいのか。 そのためのツールとして、Thread::Mutex というクラスが提供されている。

mutex = Thread::Mutex.new
n = nil

threads = (1..5).map do |i|
  Thread.new do
    mutex.lock
    n = i
    puts ''
    safe = n == i
    mutex.unlock
    safe
  end
end

p threads.map(&:join).map(&:value) # => [true, true, true, true, true]

  1. ちなみに10_000_000.timesとかにすると GVL が解放されないはずなのに、データレースが発生した。これはどうやら GVL とは別の、単一のスレッドに余りに長い時間がかかっているとタイマーでスレッドを切り替えるという仕組みによるものみたい。↩