Ruby
 Computer >> コンピューター >  >> プログラミング >> Ruby

Ruby同時実行ツールボックスを開く

並行性と並列性は、Ruby開発者にとってこれまで以上に重要です。それらは、アプリケーションを最大限に活用するハードウェアを利用して、アプリケーションを高速化できます。この記事では、すべてのRubyistが現在利用できるツールと、Rubyがこの部門で間もなく提供することを約束しているツールについて説明します。

誰もが並行性を直接使用するわけではありませんが、Sidekiqなどのツールを介して間接的に並行性を使用します。 Rubyの並行性を理解することは、独自のソリューションを構築するのに役立つだけではありません。既存のものを理解してトラブルシューティングするのに役立ちます。

しかし、最初に一歩下がって全体像を見てみましょう。

並行性と並列性

これらの用語は大まかに使用されますが、明確な意味があります。

  • 並行性: 一度に1つずつ多くのタスクを実行する技術。それらをすばやく切り替えることにより、ユーザーにはそれらが同時に発生しているように見える場合があります。
  • 並列処理: 文字通り同時に多くのタスクを実行します。同時に表示されるのではなく、同時に表示されます。

同時実行性は、IOが重いアプリケーションで最もよく使用されます。たとえば、Webアプリは定期的にデータベースとやり取りしたり、大量のネットワーク要求を行ったりする場合があります。同時実行性を使用することで、データベースがクエリに応答するのを待っている間でも、アプリケーションの応答性を維持できます。

これが可能なのは、Ruby VMが、IO中に待機している間に他のスレッドを実行できるようにするためです。プログラムが数十のリクエストを行う必要がある場合でも、同時実行を使用すると、リクエストはほぼ同時に行われます。

一方、並列処理は現在Rubyではサポートされていません。

なぜRubyに並列処理がないのですか?

現在、デフォルトのRuby実装(一般にMRIまたはCRubyと呼ばれます)を使用して、単一のRubyプロセス内で並列処理を実現する方法はありません。 Ruby VMは、複数のスレッドが同時にRubyコードを実行するのを防ぐロック(GVM、またはグローバルVMロック)を適用します。このロックは、仮想マシンの内部状態を保護し、VMがクラッシュする可能性のあるシナリオを防ぐために存在します。これは最適な場所ではありませんが、すべての希望が失われることはありません。Ruby3が間もなく登場し、コードネームGuild(この記事の最後のセクションで説明)を導入することでこのハンディキャップを解決することを約束します。

スレッド

スレッドはRubyの並行性の主力製品です。それらの使用方法と注意すべき落とし穴をよりよく理解するために、例を示します。 APIを使用し、その結果を同時実行性を使用してデータストアに保存する小さなプログラムを作成します。

APIクライアントを構築する前に、APIが必要です。以下は、数値を受け入れ、提供された数値が奇数の場合にプレーンテキストとして応答する小さなAPIの実装です。構文がおかしいと思われる場合でも、心配する必要はありません。これは並行性とは何の関係もありません。これは、私たちが使用する単なるツールです。

app =
  Proc.new do |env|
    sleep 0.05
    qs = env['QUERY_STRING']
    number = Integer(qs.match(/number=(\d+)/)[1])
    [
      '200',
      { 'Content-Type' => 'text/plain' },
      [number.even? ? 'even' : 'odd']
    ]
  end

run app

このWebアプリを実行するには、ラックgemをインストールしてから、rackup config.ruを実行する必要があります。 。

模擬データストアも必要です。 Key-Valueデータベースをシミュレートするクラスは次のとおりです。

class Datastore
  # ... accessors and initialization omitted ...
  def read(key)
    data[key]
  end

  def write(key, value)
    data[key] = value
  end
end

それでは、並行ソリューションの実装を見ていきましょう。 runというメソッドがあります 、1,000レコードを同時にフェッチし、データストアに保存します。

class ThreadPoweredIntegration
  # ... accessors and initialization ...
  def run
    threads = []
    (1..1000).each_slice(250) do |subset|
      threads << Thread.new do
        subset.each do |number|
          uri = 'https://localhost:9292/' \
            "even_or_odd?number=#{number}"
          status, body = AdHocHTTP.new(uri).blocking_get
          handle_response(status, body)
        rescue Errno::ETIMEDOUT
          retry # Try again if the server times out.
        end
      end
    end
    threads.each(&:join)
  end
  # ...
end

4つのスレッドを作成し、それぞれが250レコードを処理します。この戦略は、サードパーティのAPIや独自のシステムを圧倒しないようにするために使用されます。

複数のスレッドを使用して要求を同時に行うことにより、全体の実行には、順次実装にかかる時間の何分の1かがかかります。 HTTPリクエストを介して確立および通信するために必要なすべてのステップで、各スレッドが非アクティブになる瞬間がありますが、RubyVMでは異なるスレッドの実行を開始できます。これが、この実装がシーケンシャル実装よりもはるかに高速である理由です。

AdHocHTTP classは、この記事のために特別に実装された単純なHTTPクライアントであり、スレッドを利用したコードとファイバーを利用したコードの違いにのみ焦点を当てることができます。その実装について説明することはこの記事の範囲を超えていますが、興味があればここで確認できます。

最後に、内部ループの終わりまでにサーバーの応答を処理します。 handle_responseメソッドの方法は次のとおりです。 見た目:

# ... inside the ThreadPoweredIntegration class ...

attr_reader :ds

def initialize
  @ds = Datastore.new(even: 0, odd: 0)
end

# ...

def handle_response(status, body)
  return if status != '200'
  key = body.to_sym
  curr_count = ds.read(key)
  ds.write(key, curr_count + 1)
end

この方法は大丈夫ですね。それを実行して、データストアに何が表示されるかを見てみましょう:

{ even: 497, odd: 489 }

これはかなり奇妙なことです。1から1000の間に、500個の偶数と500個の奇数があると確信しているからです。次のセクションでは、何が起こっているのかを理解し、このバグを解決する方法の1つを簡単に調べてみましょう。

スレッドとデータの競合:悪魔は詳細に宿る

スレッドを使用すると、IOの重いプログラムをはるかに高速に実行できますが、正しく実行するのも困難です。上記の結果のエラーは、handle_responseの競合状態が原因で発生します 方法。競合状態は、2つのスレッドが同じデータを操作するときに発生します。

共有リソース(ds)を操作しているため データストアオブジェクト)、非アトミック操作には特に注意する必要があります。最初にデータストアから読み取り、2番目のステートメントでカウントを1ずつ増やして書き込みます。これは、読み取り後、書き込み前にスレッドの実行が停止する可能性があるため、問題があります。次に、別のスレッドが実行され、関心のあるキーの値が増加した場合、元のスレッドが再開したときに古いカウントが書き込まれます。

スレッドを使用することの危険性を軽減する1つの方法は、より高いレベルの抽象化を使用して並行実装を構造化することです。使用するさまざまなパターンとより安全なスレッド駆動プログラムについては、concurrent-rubygemを確認してください。

データ競合を修正する方法はたくさんあります。簡単な解決策は、ミューテックスを使用することです。この同期メカニズムは、コードの特定のセグメントへの一度に1つのアクセスを強制します。ミューテックスの使用によって修正された以前の実装は次のとおりです。

# ... inside ThreadPoweredIntegration class ...
def initialize
  # ...
  @semaphore = Mutex.new
end
# ...
def handle_response(status, body)
  return if status != '200'
  key = body.to_sym
  semaphore.synchronize do
    curr_count = ds.read(key)
    ds.write(key, curr_count + 1)
  end
end

Railsアプリケーション内でスレッドを使用する場合は、公式ガイドRailsでのスレッド化とコード実行 必読です。これらのガイドラインに従わないと、データベース接続のリークなど、非常に不快な結果を招く可能性があります。

修正した実装を実行すると、期待どおりの結果が得られます。

{ even: 500, odd: 500 }

ミューテックスを使用する代わりに、スレッドを完全に削除し、Rubyで利用可能な別の同時実行ツールを利用することで、データの競合を取り除くこともできます。次のセクションでは、IOを多用するアプリのパフォーマンスを向上させるメカニズムとしてFiberを見ていきます。

ファイバー:並行性のための細いツール

Rubyファイバーを使用すると、単一のスレッド内で協調的な同時実行性を実現できます。これは、ファイバーがプリエンプトされず、プログラム自体がスケジューリングを実行する必要があることを意味します。プログラマーがファイバーの開始と停止を制御するため、競合状態を回避するのがはるかに簡単です。

スレッドとは異なり、IOが発生した場合、ファイバーはパフォーマンスを向上させません。幸い、RubyはIOクラスを介して非同期の読み取りと書き込みを提供します。これらの非同期メソッドを使用することで、IO操作がファイバーベースのコードをブロックするのを防ぐことができます。

同じシナリオ、現在はファイバーを使用

同じ例を見てみましょうが、RubyのIOクラスの非同期機能と組み合わせたファイバーを使用しています。 Rubyの非同期IOのすべての詳細を説明することは、この記事の範囲を超えています。それでも、その動作の重要な部分に触れ、興味があれば、AdHocHTTPの関連するメソッド(先ほど調べたスレッドソリューションに表示されるのと同じクライアント)の実装を確認できます。

まず、runを見てみましょう。 ファイバーを利用した実装の方法:

class FiberPoweredIntegration
  # ... accessors and initialization ...
  def run
    (1..1000).each_slice(250) do |subset|
      Fiber.new do
        subset.each do |number|
          uri = 'https://127.0.0.1:9292/' \
            "even_or_odd?number=#{number}"
          client = AdHocHTTP.new(uri)
          socket = client.init_non_blocking_get
          yield_if_waiting(client,
                           socket,
                           :connect_non_blocking_get)
          yield_if_waiting(client,
                           socket,
                           :write_non_blocking_get)
          status, body =
            yield_if_waiting(client,
                             socket,
                             :read_non_blocking_get)
          handle_response(status, body)
        ensure
          client&.close_non_blocking_get
        end
      end.resume
    end

    wait_all_requests
  end
  # ...
end

まず、偶数か奇数かを確認する数値のサブセットごとにファイバーを作成します。

次に、番号をループして、yield_if_waitingを呼び出します。 。この方法は、現在のファイバーを停止し、別のファイバーを再開できるようにする役割を果たします。

ファイバーを作成した後、resumeと呼ぶことにも注意してください。 。これにより、ファイバーが実行を開始します。 resumeを呼び出す 作成直後は、1から1000までのメインループが終了する前でもHTTPリクエストの作成を開始します。

runの最後に メソッド、wait_all_requestsへの呼び出しがあります 。このメソッドは、実行の準備ができているファイバーを選択し、意図したすべての要求を行うことを保証します。このセクションの最後のセグメントでそれを見ていきます。

それでは、yield_if_waitingを見てみましょう。 詳細:

# ... inside FiberPoweredIntegration ...
def initialize
  @ds = Datastore.new(even: 0, odd: 0)
  @waiting = { wait_readable: {}, wait_writable: {} }
end
# ...
def yield_if_waiting(client, socket, operation)
  res_or_status = client.send(operation)
  is_waiting =
    [:wait_readable,
     :wait_writable].include?(res_or_status)
  return res_or_status unless is_waiting

  waiting[res_or_status][socket] = Fiber.current
  Fiber.yield
  waiting[res_or_status].delete(socket)
  yield_if_waiting(client, socket, operation)
rescue Errno::ETIMEDOUT
  retry # Try again if the server times out.
end

まず、クライアントを使用して操作(接続、読み取り、または書き込み)を実行しようとします。 2つの主要な結果が考えられます:

  • 成功: それが起こったら、私たちは戻ります。
  • シンボルを受け取ることができます: これは、私たちが待たなければならないことを意味します。

どのように「待つ」のですか?

  1. 現在のファイバーと組み合わせたソケットをインスタンス変数waitingに追加することで、一種のチェックポイントを作成します。 (これはHash
  2. このペアは、クライアントから返される結果に応じて、読み取りまたは書き込みを待機するIOを保持するコレクション内に格納されます(これが重要である理由はすぐにわかります)。
  3. 現在のファイバーの実行を停止し、別のファイバーを実行できるようにします。一時停止したファイバーは、関連するネットワークソケットの準備ができた後、ある時点で作業を再開する機会を得ます。次に、IO操作が再試行されます(今回は成功します)。

すべてのRubyプログラムは、それ自体がスレッドの一部であるファイバー内で実行されます(プロセス内のすべて)。結果として、最初のファイバーを作成して実行し、ある時点で譲歩すると、プログラムの中央部分の実行が再開されます。

ファイバーがIOを待機しているときに実行を生成するために使用されるメカニズムを理解したので、このファイバーを利用した実装を理解するために必要な最後のビットを調べてみましょう。

def wait_all_requests
  while(waiting[:wait_readable].any? ||
        waiting[:wait_writable].any?)

    ready_to_read, ready_to_write =
      IO.select(waiting[:wait_readable].keys,
                waiting[:wait_writable].keys)

    ready_to_read.each do |socket|
      waiting[:wait_readable][socket].resume
    end

    ready_to_write.each do |socket|
      waiting[:wait_writable][socket].resume
    end
  end
end

ここでの主なアイデアは、保留中のすべてのIO操作が完了するまで待機する(つまり、ループする)ことです。

これを行うには、IO.selectを使用します 。保留中のIOオブジェクトの2つのコレクションを受け入れます。1つは読み取り用、もう1つは書き込み用です。ジョブを終了したIOオブジェクトを返します。これらのIOオブジェクトを、それらの実行を担当するファイバーに関連付けたため、これらのファイバーを再開するのは簡単です。

すべてのリクエストが実行されて完了するまで、これらの手順を繰り返します。

グランドフィナーレ:同等のパフォーマンス、ロックの必要なし

handle_response メソッドは、スレッドを使用するコード(ミューテックスのないバージョン)で最初に使用されたものとまったく同じです。ただし、すべてのファイバーが同じスレッド内で実行されるため、データの競合は発生しません。コードを実行すると、期待どおりの結果が得られます:

{ even: 500, odd: 500 }

非同期IOを活用するたびに、そのすべてのファイバースイッチングビジネスに対処することはおそらく望ましくありません。幸いなことに、一部の宝石はこのすべての作業を抽象化し、ファイバーの使用法を開発者が考える必要のないものにします。素晴らしいスタートとして非同期プロジェクトをチェックしてください。

高スケーラビリティが必須の場合にファイバーが輝く

小規模なシナリオでもデータ競合のリスクを事実上排除するというメリットを享受できますが、ファイバーは高いスケーラビリティが必要な場合に優れたツールです。ファイバーはスレッドよりもはるかに軽量です。同じ利用可能なリソースを考えると、スレッドの作成はファイバーよりもはるかに早くシステムを圧倒します。このトピックに関する優れた調査については、プレゼンテーション The Journey to100万をお勧めします。 RubyコアチームのSamuelWilliamsによる。

ギルド-Rubyでの並列プログラミング

これまで、Rubyでの並行性のための2つの便利なツールを見てきました。ただし、どちらも純粋な計算のパフォーマンスを向上させることはできません。そのためには、現在Rubyには存在しない真の並列処理が必要になります(ここでは、デフォルトの実装であるMRIを検討しています)。

これは、「ギルド」と呼ばれる新機能の登場により、Ruby3で変更される可能性があります。詳細はまだあいまいですが、次のセクションでは、この進行中の機能がRubyでの並列処理を可能にすることをどのように約束するかを見ていきます。

ギルドの仕組み

並行/並列ソリューションを実装する際の大きな問題の原因は、共有メモリです。スレッドのセクションでは、一見無害に見えるかもしれないが実際には微妙なバグが含まれているコードをスリップして書くことがいかに簡単であるかをすでに見てきました。

新しいギルド機能の開発を率いるRubyコアチームのメンバーである笹田耕一は、複数のスレッド間でメモリを共有することの危険性に正面から取り組むソリューションの設計に熱心に取り組んでいます。 2018 RubyConfでのプレゼンテーションで、ギルドを使用する場合、可変オブジェクトを単純に共有することはできないと説明しています。主なアイデアは、不変のオブジェクトを異なるギルド間でのみ共有できるようにすることで、データの競合を防ぐことです。

ギルド間で共有メモリをある程度測定できるように、Rubyに特殊なデータ構造が導入されますが、これがどの程度正確に機能するかについての詳細はまだ完全には具体化されていません。ギルド間でオブジェクトをコピーまたは移動できるようにするAPIに加えて、別のギルドに移動した後にオブジェクトが参照されるのを防ぐためのセーフガードもあります。

ギルドを使用して一般的なシナリオを探索する

並行して実行することで計算を高速化できるようにしたいと思う状況はたくさんあります。同じデータセットの平均と平均を計算する必要があると想像してみましょう。

以下の例は、ギルドでこれを行う方法を示しています。このコードは現在機能しておらず、ギルドがリリースされた後でも機能しない可能性があることに注意してください。

# A frozen array of numeric values is an immutable object.
dataset = [88, 43, 37, 85, 84, 38, 13, 84, 17, 87].freeze
# The overhead of using guilds will probably be
# considerable, so it will only make sense to
# parallelize work when a dataset is large / when
# performing lots of operations.

g1 = Guild.new do
  mean = dataset.reduce(:+).fdiv(dataset.length)
  Guild.send_to(:mean, Guild.parent)
end

g2 = Guild.new do
  median = Median.calculate(dataset.sort)
  Guild.send_to(:median, Guild.parent)
end

results = {}
# Every Ruby program will be run inside a main guild;
# therefore, we can also receive messages in the main
# section of our program.
Guild.receive(:mean, :median) do |tag, result|
  results[tag] = result
end

まとめる

並行性と並列性はRubyの主な強みではありませんが、この部門でも、この言語はほとんどのユースケースを処理するのにおそらく十分なツールを提供します。 Ruby 3が登場し、ギルドプリミティブの導入により状況はかなり良くなるようです。私の意見では、Rubyは依然として多くの状況で非常に適切な選択であり、そのコミュニティは明らかに言語をさらに良くするために懸命に取り組んでいます。今後の予定に耳を傾けましょう!


  1. Ruby内部:Rubyオブジェクトのメモリレイアウトの調査

    Ruby内部のクイックツアーをご希望ですか? その後、あなたは御馳走になります。 なぜなら … Rubyオブジェクトがメモリ内にどのように配置されるか、そして内部データ構造を操作していくつかのクールなことを行う方法を一緒に探求します。 シートベルトを締めて、Rubyインタープリターの奥深くへの旅の準備をしてください! アレイのメモリレイアウト 配列を作成するとき、Rubyはそれをシステムメモリと少しのメタデータでバックアップする必要があります。 メタデータに含まれるもの : 配列サイズ(アイテム数) アレイ容量 クラス オブジェクトのステータス(凍結されているかどうか) データが

  2. Rubyのケースステートメントの多くの用途

    if / elsifを使用する必要があるときはいつでも 代わりにRubyのcaseステートメントを使用することを検討できるステートメント。この投稿では、いくつかの異なるユースケースと、それが実際に内部でどのように機能するかを学びます。 注:他のプログラミング言語では、これはスイッチとして知られています。 ステートメント。 Rubyのcaseステートメントのコンポーネント: キーワード 説明 ケース ケースステートメントの定義を開始します。使用する変数を取得します。 いつ 一致する可能性のあるすべての条件は1つのwhenステートメントです。 その他 一致