Sidekiqジョブの重複を回避する3つの方法
Rubyコードを記述している場合は、Sidekiqを使用してバックグラウンド処理を処理している可能性があります。 ActiveJob
から来ている場合 または他の背景については、しばらくお待ちください。ここで取り上げたヒントのいくつかは、そこにも適用できます。
人々はさまざまなケースで(Sidekiq)バックグラウンドジョブを利用します。いくつかのクランチナンバー、いくつかのユーザーへのウェルカムメールのディスパッチ、およびいくつかのスケジュールデータ同期。あなたのケースが何であれ、あなたは最終的に重複した仕事を避けるための要件に遭遇するかもしれません。重複した仕事によって、私はまったく同じことをする2つの仕事を想像します。それについて少し詳しく見ていきましょう。
ジョブの重複を排除する理由
あなたの仕事が次のようになるシナリオを想像してみてください:
class BookSalesWorker
include Sidekiq::Worker
def perform(book_id)
crunch_some_numbers(book_id)
upload_to_s3
end
...
end
BookSalesWorker
常に同じことを行います— book_id
に基づいてDBに本を照会します 最新の売上データを取得して、いくつかの数値を計算します。次に、それらをストレージサービスにアップロードします。ウェブサイトで本が販売されるたびに、このジョブがキューに入れられることに注意してください。
さて、一度に100の売り上げを得たらどうなるでしょうか?あなたはこれらの仕事の100がまったく同じことをしているでしょう。多分あなたはそれで大丈夫です。 S3writeについてはそれほど気にせず、キューはそれほど混雑していないため、負荷を処理できます。しかし、「それはスケーリングしますか?」™️
まあ、絶対にありません。より多くの本の売り上げを受け取り始めると、キューはすぐに不要な作業でいっぱいになります。 1冊の本に対して同じことを行う100のジョブがあり、10冊の本が並行して販売されている場合、キューの奥深くに1000のジョブがあり、実際には、各本に対して10のジョブしかありません。
それでは、重複するジョブがキューに積み重なるのを防ぐ方法について、いくつかのオプションを見ていきましょう。
1。 DIYウェイ
外部の依存関係や複雑なロジックのファンでない場合は、先に進んで、コードベースにいくつかのカスタムソリューションを追加できます。サンプルリポジトリを作成して、例を直接試してみました。各アプローチには、例へのリンクがあります。
1.1ワンフラグアプローチ
ジョブをエンキューするかどうかを決定するフラグを1つ追加できます。sales_enqueued_at
を追加することもできます。 彼らの本のテーブルに入れて、それを維持します。例:
module BookSalesService
def schedule_with_one_flag(book)
# Check if the job was enqueued more than 10 minutes ago
if book.sales_enqueued_at < 10.minutes.ago
book.update(sales_enqueued_at: Time.current)
BookSalesWorker.perform_async(book.id)
end
end
end
つまり、最後のジョブがキューに入れられてから10分が経過するまで、新しいジョブはキューに入れられません。 10分が経過したら、sales_enqueued_at
を更新します。 新しいジョブをキューに入れます。
もう1つできることは、ブール値である1つのフラグを設定することです。例:crunching_sales
。 crunching_sales
を設定します 最初のジョブがキューに入れられる前にtrueになります。次に、ジョブが完了したら、falseに設定します。スケジュールを取得しようとする他のすべてのジョブは、crunching_sales
まで拒否されます は誤りです。
作成したリポジトリの例でこのアプローチを試すことができます。
1.22つのフラグによるアプローチ
ジョブが10分間キューに入れられないように「ロック」するのは怖すぎるように聞こえますが、コードに追加のフラグがあれば問題ない場合は、次の提案に興味があるかもしれません。
既存のsales_enqueued_at
に別のフラグを追加できます — sales_calculated_at
。すると、コードは次のようになります。
module BookSalesService
def schedule_with_two_flags(book)
# Check if sales are being calculated right now
if book.sales_enqueued_at <= book.sales_calculated_at
book.update(sales_enqueued_at: Time.current)
BookSalesWorker.perform_async(book.id)
end
end
end
class BookSalesWorker
include Sidekiq::Worker
def perform(book_id)
crunch_some_numbers(book_id)
upload_to_s3
# New adition
book.update(sales_calculated_at: Time.current)
end
...
end
試してみるには、サンプルリポジトリの手順を確認してください。
ここで、ジョブがキューに入れられてから終了するまでの時間の一部を制御します。その時間帯では、ジョブをキューに入れることはできません。ジョブの実行中、sales_enqueued_at
sales_calculated_at
よりも大きくなります 。ジョブの実行が終了すると、sales_calculated_at
sales_enqueued_at
よりも大きくなります(より最近になります) 新しいジョブがキューに入れられます。
2つのフラグを使用すると興味深い場合があるため、UIでそれらの販売数が最後に更新された時刻を表示できます。そうすれば、それらを読んだユーザーは、データがどれだけ新しいかを知ることができます。お互いに有利な状況。
フラグの合計
必要なときにこのようなソリューションを作成したくなるかもしれませんが、私には、それらは少し不器用に見え、オーバーヘッドが追加されます。ユースケースが単純な場合はこれを使用することをお勧めしますが、複雑または不十分であることが判明したらすぐに、他のオプションを試してみることをお勧めします。
フラグアプローチの大きな欠点は、これらの10分間にキューに入れようとしたすべてのジョブが失われることです。大きな利点は、依存関係を持ち込まないことです。これにより、キュー内のジョブ数が非常に迅速に軽減されます。
1.3キューのトラバース
実行できる別のアプローチは、同じジョブがエンキューされないようにするカスタムロックメカニズムを作成することです。興味のあるSidekiqキューウェアをチェックし、仕事(労働者)がすでにそこにいるかどうかを確認します。コードは次のようになります:
module BookSalesService
def schedule_unique_across_queue(book)
queue = Sidekiq::Queue.new('default')
queue.each do |job|
return if job.klass == BookSalesWorker.to_s &&
job.args == [book.id]
end
BookSalesWorker.perform_async(book.id)
end
end
class BookSalesWorker
include Sidekiq::Worker
def perform(book_id)
crunch_some_numbers(book_id)
upload_to_s3
end
...
end
上記の例では、'default'
かどうかを確認しています。 キューには、クラス名がBookSalesWorker
のジョブがあります 。また、ジョブの引数が本のIDと一致するかどうかも確認しています。 BookSalesWorker
の場合 同じブックIDのジョブがキューにある場合、別のブックIDをスケジュールするのではなく、早期に戻ります。
キューが空であるためにジョブのスケジュールが速すぎると、それらの一部がスケジュールされる可能性があることに注意してください。ローカルでテストしたときに、正確なことが起こりました:
100.times { BookSalesService.schedule_unique_across_queue(book) }
リポジトリの例で試すことができます。
このアプローチの良いところは、必要に応じてすべてのキューをトラバースして既存のジョブを検索できることです。短所は、キューが空で、一度に多数のジョブをスケジュールした場合でも、重複するジョブを保持できることです。また、1つをスケジュールする前に、キュー内のすべてのジョブをトラバースする可能性があるため、のサイズによってはコストがかかる可能性があります。キュー。
2。 SidekiqEnterpriseへのアップグレード
あなたまたはあなたの組織にいくらかのお金がかかっている場合は、Sidekiqのエンタープライズバージョンにアップグレードできます。月額179ドルからで、重複する仕事を避けるのに役立つクールな機能があります。残念ながら、私はSidekiqEnterpriseを持っていませんが、それらのドキュメントで十分だと思います。次のコードを使用して、一意の(複製されていない)ジョブを簡単に実行できます。
class BookSalesWorker
include Sidekiq::Worker
sidekiq_options unique_for: 10.minutes
def perform(book_id)
crunch_some_numbers(book_id)
upload_to_s3
end
...
end
以上です。 「OneFlagApproach」セクションで説明したものと同様のジョブ実装があります。ジョブは10分間一意になります。つまり、同じ引数を持つ他のジョブをその期間にスケジュールすることはできません。
かなりクールなワンライナーですね。 Enterprise Sidekiqをお持ちで、この機能について知ったばかりの場合は、私がお手伝いできて本当にうれしいです。私たちのほとんどはそれを使用するつもりはないので、次の解決策に飛び込みましょう。
3。 sidekiq-unique-jobs To The Rescue
はい、私たちは宝石について言及しようとしていることを知っています。そして、はい、それにはいくつかのLuaファイルが含まれているため、一部の人を先送りにする可能性があります。しかし、我慢してください、それはあなたがそれで得ている本当に甘い取引です。 sidekiq-unique-jobgemには、多くのロックやその他の構成オプションが付属しています。おそらく必要以上のものです。
すばやく開始するには、sidekiq-unique-jobs
を配置します gemをGemfileに入れ、bundle
を実行します 次のようにワーカーを構成します:
class UniqueBookSalesWorker
include Sidekiq::Worker
sidekiq_options lock: :until_executed,
on_conflict: :reject
def perform(book_id)
book = Book.find(book_id)
logger.info "I am a Sidekiq Book Sales worker - I started"
sleep 2
logger.info "I am a Sidekiq Book Sales worker - I finished"
book.update(sales_calculated_at: Time.current)
book.update(crunching_sales: false)
end
end
たくさんのオプションがありますが、私はこれを単純化して使用することにしました:
sidekiq_options lock: :until_executed, on_conflict: :reject
lock: :until_executed
最初のUniqueBookSalesWorker
をロックします それが実行されるまでjob。 on_conflict: :reject
を使用 、実行しようとする他のすべてのジョブをデッドキューに拒否する必要があると言っています。ここで達成したことは、上記のトピックのDIYの例で行ったことと似ています。
これらのDIYの例に対するわずかな改善は、何が起こったかの一種のログがあることです。それがどのように見えるかを理解するために、次のことを試してみましょう:
5.times { UniqueBookSalesWorker.perform_async(Book.last.id) }
1つのジョブのみが完全に実行され、他の4つのジョブはデッドキューに送られ、そこで再試行できます。このアプローチは、重複するジョブが単に無視された例とは異なります。
ロックと競合解決に関しては、選択できるオプションがたくさんあります。特定のユースケースについては、gemのドキュメントを参照することをお勧めします。
優れた洞察
このgemの優れている点は、ロックとキューでダウンしたものの履歴を表示できることです。必要なのは、config/routes.rb
に次の行を追加することだけです。 :
# config/routes.rb
require 'sidekiq_unique_jobs/web'
Rails.application.routes.draw do
mount Sidekiq::Web, at: '/sidekiq'
end
元のSidekiqクライアントが含まれますが、さらに2つのページが表示されます。1つはジョブロック用で、もう1つは変更ログ用です。見た目は次のとおりです:
「ロック」と「変更ログ」という2つの新しいページがあることに注目してください。かなりクールな機能。
gemがインストールされてすぐに使用できるexampleprojectで、これらすべてを試すことができます。
なぜLuaなのか
まず第一に、私は宝石の作者ではないので、ここで物事を想定しています。初めて宝石を見たとき、私は疑問に思いました:なぜRuby宝石の中でLuaを使うのですか?最初は奇妙に見えるかもしれませんが、RedisはLuaスクリプトの実行をサポートしています。宝石の作者はこれを念頭に置いており、Luaでもっと機敏なロジックを実行したいと思っていたと思います。
gemのリポジトリにあるLuaファイルを見ると、それほど複雑ではありません。すべてのLuaスクリプトは、後でSidekiqUniqueJobs::Script::Caller
のRubyコードから呼び出されます。 ここで。ソースコードを見てください。物事がどのように機能するかを読んで理解するのは興味深いことです。
代替宝石
ActiveJob
を使用する場合 広範囲にわたって、active-job-uniqueness
を試すことができます アイデアは似ていますが、カスタムLuaスクリプトの代わりに、[Redlock]を使用してRedisのアイテムをロックします。
この宝石を使ってユニークな仕事をするために、あなたはこのような仕事を想像することができます:
class BookSalesJob < ActiveJob::Base
unique :until_executed
def perform
...
end
end
構文はそれほど冗長ではありませんが、sidekiq-unique-jobs
と非常によく似ています。 宝石。 ActiveJob
に大きく依存している場合は、ケースが解決する可能性があります 。
最終的な考え
あなたがyourappで重複する仕事を処理する方法についてある程度の知識を得たことを願っています。私は間違いなく、さまざまなソリューションを調べて遊んで楽しんでいました。探しているものが見つからなかった場合は、いくつかの例があなた自身の何かを作成するきっかけになったと思います。
これがすべてのコードスニペットを含むサンプルプロジェクトです。
次はお会いしましょう、乾杯。
P.S。 Ruby Magicの投稿をマスコミから離れたらすぐに読みたい場合は、Ruby Magicニュースレターを購読して、投稿を1つも見逃さないでください。
-
iPhone 2022 で重複した写真を削除する 3 つの方法
画像の重複は、すべてのスマートフォン、Android、または iOS で発生する進行中の問題です。 AndroidフォンはSDカードのような拡張メモリ機能を備えていますが、製造時に割り当てられた内蔵ストレージスペースに制限されているのはiPhoneユーザーです.したがって、写真コレクションを整理するとともに、iPhone ユーザーにとって貴重なストレージ容量を節約することが重要になります。この投稿では、Duplicate Photos Fixer として知られる重複写真クリーナー アプリケーションを使用して、iPhone で重複写真を削除する 3 つの方法について説明します。 iPhone で
-
Windows 10 でキーボード言語を変更する 3 つの最適な方法
新しいデバイスを箱から出すとき、たとえそれが Windows 10 のキーボード レイアウトであっても、選択したものすべてが必要です!結局のところ、それはあなたのコンピューターであるため、Windows 10 の壁紙からキーボードの設定まで、すべての設定を選択する必要があります。 ここでは、Windows 10 でキーボード言語を変更する 3 つの最良の方法を紹介します。 方法 1:Windows の設定から始めて、Windows 10 でキーボードの設定を変更する Windows キーと I を同時に押して、Windows 10 の設定アプリを開きます。 Windows の設定で、[時