AppSignalメトリクスを使用してキューワーカーを効率的にスケーリングする
ほとんどのウェブアプリは、エラーが発生しやすい、または時間のかかるサイドジョブの処理によく使用されるバックグラウンドキューの恩恵を受けることができます。これらのバックグラウンドジョブは、メールの送信、キャッシュの更新、コアビジネスロジックの実行などさまざまです。
バックグラウンドキューイングシステムは処理する必要のあるジョブの数をスケーリングするため、それらのジョブを処理するワーカーのプールもスケーリングする必要があります。キューに入れられるジョブの割合が異なる場合、キューワーカーの数をスケールアップすることが重要な側面になります。さらに、キューのスループットが低いときにワーカーを縮小すると、大幅な節約が可能になります。
残念ながら、多くのキューイングバックエンドには、ワーカーをオンまたはオフにするスケーリングロジックが装備されていませんが、いくつかの簡単な数学およびパフォーマンスデータを使用して、キューで待機している作業に基づいて最適なワーカー数を見つけることができます。
経験則のキューイング
ジョブがキューワーカーによって処理されるよりも高いレートでキューに入れられると、キューの深さが増し、各ジョブがキューで費やす時間も長くなります。一般に、待機時間(各ジョブのキュー)を可能な限り低くします— 0秒から許容可能な制限まで。希望の待機時間を満足するために必要なワーカーの数を見積もるには、キューイングルールオブサム(QROT)を使用できます。通常、 QROTは、ジョブのキューを処理するために必要なサーバーの数を表す不等式として表されますが、1つの形式は次のように記述できます。
workers = (number_of_jobs * avg_service_time_per_job) / time_to_finish_queue
したがって、たとえば30秒という希望の時間にキューを処理するために必要なワーカーの数を把握したい場合は、ジョブの数(キューのサイズ)とそれにかかる平均時間を知る必要があります。各ジョブを実行します。たとえば、7500のジョブのキューがあり、各ジョブの実行に平均0.3秒かかる場合、75人のワーカーでそのキューを30秒で終了できます。
パフォーマンスメトリクスへのアクセス
キュー内のジョブの平均サービス時間を見積もるには、各ジョブクラスのパフォーマンスメトリックにアクセスする必要があります。幸い、AppSignalは、一般的なキューイングバックエンドのパフォーマンスデータをすぐに記録し、毎回のメトリックを記録します。ジョブが実行されました。
今後のAppSignalGraphQLAPIを使用して、過去24時間の各ジョブタイプの平均期間を取得できます。このAPIはまだ完全には公開されていませんが、現在AppSignalのパフォーマンスグラフやその他のデータ表示に使用されています。幸い、GraphQLAPIは自己文書化を目的としており、GraphiQLなどのツールを使用してAPIをイントロスペクトし、APIが公開するデータオブジェクトを見つけることができます。
GraphQLクエリを作成するプロセスはこの投稿の範囲外ですが、以下は、人気のあるFaradayHTTPクライアントライブラリを使用してAppSignalGraphQL APIに接続し、基本的なメトリック集約をクエリするRubyクラスの例です。
require 'json'
require 'faraday'
class AppsignalClient
BASE_URL = 'https://appsignal.com/'
DEFAULT_APP_ID = ENV['APPSIGNAL_APP_ID']
DEFAULT_TOKEN = ENV['APPSIGNAL_API_TOKEN']
# GraphQL query to fetch the "mean" metric for the selected app.
METRICS_QUERY = <<~GRAPHQL.freeze
query($appId: String!, $query: [MetricAggregation!]!, $timeframe: TimeframeEnum!) {
app(id: $appId) {
metrics {
list(timeframe: $timeframe, query: $query) {
start
end
rows {
fields {
key
value
}
}
}
}
}
}
GRAPHQL
def initialize(app_id: DEFAULT_APP_ID, client_secret: DEFAULT_TOKEN)
@app_id = app_id
@client_secret = client_secret
end
# Fetch the average duration for a job class's perform action
# Default timeframe is last 24 hours
def average_job_duration(job_class, timeframe: 'R24H')
response =
connection.post(
'graphql',
JSON.dump(
query: METRICS_QUERY,
variables: {
appId: @app_id,
timeframe: timeframe,
query: [
name: 'transaction_duration',
headerType: legacy
tags: [
{ key: 'namespace', value: 'background' },
{ key: 'action', value: "#{job_class.name}#perform" },
],
fields: [{ field: 'MEAN', aggregate: 'AVG' }],
],
}
)
)
data = JSON.parse(response.body, symbolize_names: true)
rows = data.dig(:data, :app, :metrics, :list, :rows)
# There may be no metrics in the selected timeframe
return 0.0 if rows.empty?
rows.first[:fields].first[:value]
end
private
def connection
@connection ||= Faraday.new(
url: BASE_URL,
params: { token: @client_secret },
headers: { 'Content-Type' => 'application/json' },
request: { timeout: 10 }
) do |faraday|
faraday.response :raise_error
faraday.adapter Faraday.default_adapter
end
end
end
このクラスを使用すると、特定のActiveJobクラスの平均ジョブ期間を取得でき、ミリ秒単位で返されます。
AppsignalClient.new.average_job_duration(MyMailerJob)
# => 233.1
デフォルトでは、これは過去24時間のデータにわたるジョブの平均トランザクション期間を必要とします。ジョブがそれよりもはるかに頻繁に実行される場合は、そのウィンドウを短縮して、最近の実行をより重視することができます。平均。たとえば、1時間に数百回実行されるジョブがある場合、timeframe
を変更したい場合があります。 1時間まで(R1H
)現在実行されている場合、そのような1つのジョブの期間をより正確に見積もることができます。
このパフォーマンスデータは、サーバー使用率データとは別のものであることに注意してください。このデータは、各ジョブに必要な作業を実際に実行するのにかかる時間を示します。これは、使用率メトリックなどの外部測定よりも、ワーカーのスケーリングに役立ちます。 。
キューの内省
次に、キューをイントロスペクトして、サービス対象のジョブを決定する必要があります。一般的なRubyキューイングバックエンドはResqueであり、これもActiveJobとうまく統合されます。Resqueの特定のキューのキューに入れられたジョブにアクセスして、 AppsignalClient
を使用して、クラスに基づいて各ジョブを実行します 上からのクラス。
require 'resque'
class ResqueEstimator
def initialize(queue: 'default')
@queue = queue
@cache = {}
@appsignal_client = AppsignalClient.new
end
def enqueued_duration_estimate
Resque.data_store.everything_in_queue(queue).map do |job|
estimate_job_duration decode_activejob_args(job)
end.sum
end
def estimate_job_duration(job)
@cache[job['job_class']] ||= @appsignal_client
.average_job_duration job['job_class']
end
private
# ActiveJob-specific method for parsing job arguments
# for ActiveJob+Resque integration
def decode_activejob_args(job)
decoded_job = job
decoded_job = Resque.decode(job) if job.is_a? String
decoded_job['args'].first
end
end
このクラスの使用は次のように簡単です。
ResqueEstimator.new(queue: 'my_queue').enqueued_duration_estimate
# => 23000 (ms)
estimate_job_duration
でジョブ期間の簡単なメモ化を使用していることに注意してください AppSignal APIへの重複呼び出しを回避する方法。ほとんどの場合、キューには同じクラスの多くのジョブが含まれ、各クラスの実行を1回だけ見積もることで、オーバーヘッドを削減できます。
パフォーマンスデータを使用したスケーリング
これらすべてをまとめると、最近のパフォーマンスデータを使用して、キューのコンテンツに基づいてキューワーカーをスケールアップまたはスケールダウンできます。いつでも、キュー内のジョブを確認して、必要なワーカーの見積もりを取得できます。希望の制限時間内にサービスを提供します。
希望するキューイング時間制限(ジョブがキューで待機する最大時間)を決定する必要があります。 30秒。最小および最大のワーカー数も指定する必要があります。キューがしばらく空になった後にキューに入れられた最初のジョブを処理するために、少なくとも1人のワーカーをキューで実行し続けると便利です。また、ワーカー数が多すぎてデータベース接続やサーバーの使用コストが過大になるのを防ぐために、最大のワーカー数が必要です。
このロジックを処理するクラスを作成できます。これは基本的に、以前の経験則の実装にすぎません。
class ResqueWorkerScaler
def initialize(queue: 'default', workers_range: 1..100, desired_wait_ms: 300_000)
@queue = queue
@workers_range = workers_range
@desired_wait_ms = desired_wait_ms
@estimator = ResqueEstimator.new(queue: @queue)
end
def desired_workers
total_time_ms = @estimator.enqueued_duration_estimate
workers_required = [(total_time_ms / desired_wait_ms).ceil, workers_range.last].min
[workers_required, workers_range.first].max
end
def scale
# using platform-specific scaling interface, scale to desired_workers
end
end
需要に基づいてスケールアップおよびスケールダウンするように、定期的にワーカーをスケールアップする必要があります。 ResqueWorkerScaler
を呼び出すRakeタスクを作成できます 労働者を拡大するクラス:
# inside lib/tasks/resque_workers.rake
namespace :resque_workers do
desc 'Scale worker pool based on enqueued jobs'
task :scale, [:queue] => [:environment] do |_t, args|
queue = args[:queue] || 'default'
ResqueWorkerScaler.new(queue: queue).scale
end
end
次に、このRakeタスクを定期的に実行するようにcronジョブを設定できます。
*/5 * * * * /path/to/our/rake resque_workers:scale
# scale a non-default queue:
*/5 * * * * /path/to/our/rake resque_workers:scale['my_queue']
スケーリングタスクを5分ごとに実行するように設定していることに注意してください。新しいワーカーはそれぞれ、オンラインになってジョブの処理を開始するのにある程度の時間がかかります。コードベースのサイズとgemの数にもよりますが、おそらく10〜40秒です。したがって、ワーカーを1分ごとにスケーリングしようとすると、目的の変更が有効になる前に再度スケールアップまたはスケールダウンする可能性があります。アプリでキューの使用量が1日のさまざまな時間にのみ変動する場合は、次のことができます。 1時間間隔でRakeタスクを呼び出す可能性がありますが、キューサイズが1時間以内に変動する場合は、上記の5分など、より頻繁な間隔でキューをイントロスペクトする必要があります。
次のステップ
実際のパフォーマンスデータを使用してインフラストラクチャをスケーリングするこのようなシステムは、需要に非常に対応し、さまざまな使用法に対応できます。特に、メモリ使用量や負荷平均などのホストメトリックが変化する可能性が低いバックグラウンド処理などの環境では、パフォーマンスメトリックを使用してスケールの方がはるかに適切です。
代替のキュースケーリングの実装では、キュー全体を内省する代わりに、ジョブごとの平均待機時間を測定できますが、キューの内容とサイズが急速に変化する場合、そのメトリックは代表的ではない可能性があります。または、ジョブの実行時間が大きく変動する場合、キューのイントロスペクションは応答がはるかに速く、確実に修正されます。
ただし、キューイントロスペクションシステムで考慮すべきいくつかの制限があります。キューが十分に大きい場合、実行見積もりのために各ジョブを調べるのは非常に遅くなります。そのような場合は、合計ジョブ数を見つけてから、を選択する方がよい場合があります。キューからのジョブのランダムな代表的なサンプリングと、そのサンプルからの平均実行を計算します。または、ジョブクラスにパフォーマンスデータがまだ関連付けられていない場合は、実行されて記録されるまで、想定される実行時間を使用する必要があります。数回。
上で概説したシステムは、いくつかの調整で大幅に改善することもできます。各見積もりは分離されており、独立しているため、各ジョブクラスの実行時間を並行して見積もることを検討してください。また、キューのイントロスペクションを更新して、現在実行されているジョブを含めることもできます。ワーカーを使用して、合計サービス時間の見積もりの精度を向上させます。複数のキューを使用するバックグラウンド処理アーキテクチャの場合、キューの優先度に基づいて各キューに必要な待機時間を割り当て、ワーカーを適切にスケーリングできます。
キューイングシステムは、どのプロジェクトでも非常に変動性の高い作業を多く収集する傾向があります。キューからのジョブの実行に関するパフォーマンスデータを使用すると、リソースを効果的にスケーリングして、応答性が高く効率的な方法ですべての作業にサービスを提供できます。
ハッピースケーリング!
P.S。 Ruby Magicの投稿をマスコミから離れたらすぐに読みたい場合は、Ruby Magicニュースレターを購読して、投稿を1つも見逃さないでください。
-
MongoDBを使用したスケーリング:シャーディングインフラストラクチャのセットアップ
最近のブログ投稿で、MongoDBをスケーリングする必要がある場合について説明しました。この投稿では、MongoDBをスケーリングする方法に焦点を当てています。 MongoDBバージョン3.0では、デフォルトのストレージエンジンとしてWiredTigerが導入されました。それ以来、MongoDBは、スケーラビリティに関して2つのアプローチを提供できるようになりました。 Mongoは、水平方向だけでなく垂直方向にも拡張できるようになりました。どちらのアプローチも詳細を確認する必要があります。 垂直方向のスケーリング 垂直方向にスケーリングすると、CPUの数や種類、RAMやディスク容
-
CloudflareワーカーとのRedis@Edge
エッジでのコンピューティングは、近年最もエキサイティングな機能の1つです。 CDNを使用すると、ファイルをユーザーに近づけることができます。エッジコンピューティングを使用すると、アプリケーションをユーザーの近くで実行できます。これは、開発者がグローバルに分散されたパフォーマンスの高いアプリケーションを構築するのに役立ちます。 Cloudflare Workersは、現在この分野の主要製品です。コールドスタートのないサーバーレス処理環境を提供します。 Cloudflareのグローバルネットワークを活用して、アプリケーションのレイテンシーを最小限に抑えます。関数はJavascript、Rust、