Rubyのバックグラウンド処理システムである構築による学習
今日の投稿では、楽しみのために素朴なバックグラウンド処理システムを実装します! Sidekiqのような人気のあるバックグラウンド処理システムの内部を覗き見しながら、途中でいくつかのことを学ぶかもしれません。この楽しみの産物は、決して本番環境での使用を目的としたものではありません。
アプリケーションに、1つ以上のWebサイトをロードし、それらのタイトルを抽出するタスクがあると想像してみてください。これらのウェブサイトのパフォーマンスには影響を与えないため、メインスレッド(またはウェブアプリケーションを構築している場合は現在のリクエスト)の外部で、バックグラウンドでタスクを実行したいと思います。
タスクのカプセル化
バックグラウンド処理に入る前に、手元のタスクを実行するためのサービスオブジェクトを作成しましょう。 OpenURIとNokogiriを使用して、タイトルタグの内容を抽出します。
require 'open-uri'
require 'nokogiri'
class TitleExtractorService
def call(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
サービスを呼び出すと、指定されたURLのタイトルが出力されます。
TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
これは期待どおりに機能しますが、構文を少し改善して、他のバックグラウンド処理システムのように見えるようにすることができるかどうかを見てみましょう。 Magique::Worker
を作成する モジュールでは、サービスオブジェクトにシンタックスシュガーを追加できます。
module Magique
module Worker
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def perform_now(*args)
new.perform(*args)
end
end
def perform(*)
raise NotImplementedError
end
end
end
モジュールはperform
を追加します ワーカーインスタンスへのメソッドとperform_now
呼び出しを少し良くするためのworkerクラスへのメソッド。
モジュールをサービスオブジェクトに含めましょう。その間、名前をTitleExtractorWorker
に変更しましょう。 call
を変更します perform
メソッド 。
class TitleExtractorWorker
include Magique::Worker
def perform(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
呼び出しでも同じ結果が得られますが、何が起こっているのかが少し明確になります。
TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
非同期処理の実装
タイトル抽出が機能するようになったので、過去のRubyMagicの記事からすべてのタイトルを取得できます。これを行うために、RUBYMAGIC
があると仮定しましょう。 過去の記事のすべてのURLのリストで定数。
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_now(url)
end
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
過去の記事のタイトルを取得していますが、すべて抽出するには時間がかかります。これは、各リクエストが完了するまで待ってから、次のリクエストに進むためです。
perform_async
を導入して、これを改善しましょう。 ワーカーモジュールへのメソッド。処理を高速化するために、URLごとに新しいスレッドを作成します。
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Thread.new { new.perform(*args) }
end
end
end
end
呼び出しをTitleExtractorWorker.perform_async(url)
に変更した後 、ほぼ一度にすべてのタイトルを取得します。ただし、これは、RubyMagicブログへの接続を一度に20以上開くことも意味します。 (ブログをいじってすみません、皆さん! 😅)
独自の実装に従って、長時間実行されるプロセス(Webサーバーなど)の外部でこれをテストする場合は、loop { sleep 1 }
のようなものを追加することを忘れないでください。 スクリプトの最後まで、プロセスがすぐに終了しないようにします。
タスクのキューイング
呼び出しごとに新しいスレッドを作成するというアプローチでは、最終的にリソースの制限に達します(私たちの側とアクセスしているWebサイトの両方で)。私たちは善良な市民になりたいので、実装を非同期であるがサービス拒否攻撃のようには感じられないものに変更しましょう。
この問題を解決する一般的な方法は、生産者/消費者パターンを使用することです。 1人以上のプロデューサーがタスクをキューにプッシュし、1人以上のコンシューマーがキューからタスクを取得して処理します。
キューは基本的に要素のリストです。理論的には、単純な配列で十分です。ただし、同時実行性を扱っているため、一度に1つのプロデューサーまたはコンシューマーのみがキューにアクセスできるようにする必要があります。これに注意しないと、2人が同時にドアを押し込もうとするように、事態は混乱してしまいます。
この問題は生産者/消費者問題として知られており、複数の解決策があります。幸い、これは非常に一般的な問題であり、Rubyには適切なQueue
が付属しています。 スレッドの同期を気にすることなく使用できる実装。
これを使用するには、プロデューサーとコンシューマーの両方がキューにアクセスできることを確認しましょう。これを行うには、Magique
にクラスメソッドを追加します。 モジュールとQueue
のインスタンスの割り当て それに。
module Magique
def self.backend
@backend
end
def self.backend=(backend)
@backend = backend
end
end
Magique.backend = Queue.new
次に、perform_async
を変更します 独自の新しいスレッドを作成する代わりに、タスクをキューにプッシュする実装。タスクは、ワーカークラスへの参照と、perform_async
に渡される引数を含むハッシュとして表されます。 メソッド。
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Magique.backend.push(worker: self, args: args)
end
end
end
end
これで、プロデューサー側の作業は完了です。次に、消費者側を見てみましょう。
各コンシューマーは、キューからタスクを取得して実行する個別のスレッドです。スレッドのように1つのタスクの後で停止する代わりに、コンシューマーはキューから別のタスクを取得して実行します。 Magique::Processor
と呼ばれるコンシューマーの基本的な実装は次のとおりです。 。各プロセッサは、無限にループする新しいスレッドを作成します。反復ごとに、キューから新しいタスクを取得しようとし、ワーカークラスの新しいインスタンスを作成し、そのperform
を呼び出します。 指定された引数を持つメソッド。
module Magique
class Processor
def self.start(concurrency = 1)
concurrency.times { |n| new("Processor #{n}") }
end
def initialize(name)
thread = Thread.new do
loop do
payload = Magique.backend.pop
worker_class = payload[:worker]
worker_class.new.perform(*payload[:args])
end
end
thread.name = name
end
end
end
処理ループに加えて、Magique::Processor.start
という便利なメソッドを追加します。 。これにより、一度に複数のプロセッサを起動できます。スレッドに名前を付ける必要はありませんが、実際に期待どおりに機能しているかどうかを確認できます。
TitleExtractorWorker
の出力を調整しましょう 現在のスレッドの名前を含める。
puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"
バックグラウンド処理のセットアップをテストするには、タスクをキューに入れる前に、まず一連のプロセッサを起動する必要があります。
Magique.backend = Queue.new
Magique::Processor.start(5)
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
これを実行しても、すべての記事のタイトルが取得されます。タスクごとに個別のスレッドを使用するほど高速ではありませんが、バックグラウンド処理を行わなかった最初の実装よりも高速です。追加されたプロセッサ名のおかげで、すべてのプロセッサがキューを介して動作していることも確認できます。並行プロセッサの数を微調整することで、処理速度と既存のリソース制限のバランスを見つけることができます。
複数のプロセスとマシンへの拡張
これまでのところ、バックグラウンド処理システムの現在の実装は十分に機能しています。ただし、それでも同じプロセスに制限されます。リソースを大量に消費するタスクは、プロセス全体のパフォーマンスに影響を及ぼします。最後のステップとして、ワークロードを複数のプロセス、場合によっては複数のマシンに分散する方法を見てみましょう。
キューは、プロデューサーとコンシューマーの間の唯一の接続です。現在、メモリ内の実装を使用しています。 Sidekiqからさらにインスピレーションを得て、Redisを使用してキューを実装しましょう。
Redisは、タスクのプッシュとフェッチを可能にするリストをサポートしています。さらに、Redis Ruby gemはスレッドセーフであり、リストを変更するRedisコマンドはアトミックです。これらのプロパティにより、同期の問題が発生することなく、非同期バックグラウンド処理システムで使用できるようになります。
push
を実装するRedisバックキューを作成しましょう およびshift
Queue
のようなメソッド 以前使用しました。
require 'json'
require 'redis'
module Magique
module Backend
class Redis
def initialize(connection = ::Redis.new)
@connection = connection
end
def push(job)
@connection.lpush('magique:queue', JSON.dump(job))
end
def shift
_queue, job = @connection.brpop('magique:queue')
payload = JSON.parse(job, symbolize_names: true)
payload[:worker] = Object.const_get(payload[:worker])
payload
end
end
end
end
RedisはRubyオブジェクトについて何も知らないため、lpush
を使用してタスクをデータベースに保存する前に、タスクをJSONにシリアル化する必要があります。 リストの先頭に要素を追加するコマンド。
キューからタスクをフェッチするために、brpop
を使用しています コマンド。リストから最後の要素を取得します。リストが空の場合、新しい要素が利用可能になるまでリストはブロックされます。これは、使用可能なタスクがないときにプロセッサを一時停止するための優れた方法です。最後に、Redisからタスクを取得した後、Object.const_get
を使用して、ワーカーの名前に基づいて実際のRubyクラスを検索する必要があります。 。
最後のステップとして、物事を複数のプロセスに分割しましょう。プロデューサー側では、バックエンドを新しく実装されたRedisキューに変更するだけです。
# ...
Magique.backend = Magique::Backend::Redis.new
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
物事の消費者側では、次のような数行で逃げることができます:
# ...
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
loop { sleep 1 }
実行されると、コンシューマプロセスは新しい作業がキューに到着するのを待ちます。タスクをキューにプッシュするプロデューサープロセスを開始すると、タスクがすぐに処理されることがわかります。
責任を持って楽しんでください。これを本番環境で使用しないでください
本番環境で使用する実際のセットアップからはほど遠いものにしましたが(そうしないでください!)、バックグラウンドプロセッサを構築するためにいくつかの手順を実行しました。まず、プロセスをバックグラウンドサービスとして実行することから始めました。次に、非同期にしてQueue
を使用しました 生産者/消費者問題を解決するため。次に、メモリ内の実装ではなく、Redisを使用してプロセスを複数のプロセスまたはマシンに拡張しました。
前述のように、これはバックグラウンド処理システムの簡略化された実装です。足りないものがたくさんあり、明示的に扱われていません。これらには、エラー処理、複数のキュー、スケジューリング、接続プール、および信号処理が含まれます(ただし、これらに限定されません)。
それでも、これを書くのは楽しかったので、バックグラウンド処理システムの内部を覗いて楽しんでいただければ幸いです。たぶん、あなたは1つか2つを奪ったことさえあります。
-
樽スケール警報システムの構築
以前のブログでは、オフィスでコールドブリューコーヒー樽の重量を使用してSlackアラートを送信し、不足してコールドブリューコーヒー樽の補充が必要になったときに通知する方法について詳しく説明しました。 2部構成のシリーズのこの第2部では、私がどのようにスケールを構築したかを紹介します。主に、時間とお金をかけずに何かが欲しかったのですが、私たちのニーズを満たすものはまだありませんでした。 Elasticsearch樽のセットアップ要件 まず、プロジェクトの要件を分類する必要がありました。 安い(<$ 100) 最小限のケアと給餌 リモート管理 樽のレベルを簡単に報告する 樽から冷たいビールを
-
Ruby2.6の9つの新機能
Rubyの新しいバージョンには、新しい機能とパフォーマンスの改善が含まれています。 変更についていきますか? 見てみましょう! 無限の範囲 Ruby 2.5以前のバージョンは、すでに1つの形式の無限範囲をサポートしています( Float ::INFINITY を使用) )、しかしRuby2.6はこれを次のレベルに引き上げます。 新しい無限の範囲 次のようになります: (1..) これは、(1..10)のような終了値がないため、通常の範囲とは異なります。 。 使用例 : [a, b, c].zip(1..) # [[a, 1], [b, 2], [c, 3]] [1,2,3,