Kafkaを使用したRailsでのイベントストリーミング
企業は、大量のデータをリアルタイムで処理および共有する必要性に迅速に対応して、洞察を得て、より魅力的な顧客体験を生み出すことを望んでいます。そのため、従来のデータ処理は今日の世界ではもはや実行可能ではありません。
これを実現するには、大量のデータを可能な限り高速に処理してから、他のサービスに送信してさらに処理する必要があります。ただし、これらすべての迅速なアクションの途中で、イベントが発生したときに消費者に通知する必要があります。これは、イベントストリーミングを使用して行うことができます。
これは、使用するGitHubのリポジトリです。
イベントストリーミングについて話す前に、イベントとは何かについて話しましょう。アプリケーション内で発生するイベントは、ユーザープロセス、または単にビジネスに影響を与えるアクションに関連している可能性があります。
イベントは状態の変化を表し、アプリケーションを変更する方法の問題ではありません。これらを例として考えてください:
- サービスにログインしているユーザー
- 支払い取引
- ブログに投稿を公開しているライター
ほとんどの場合、イベントはより多くのイベントをトリガーします。たとえば、ユーザーがサービスに登録すると、アプリはデバイスに通知を送信し、データベースにレコードを挿入して、歓迎のメールを送信します。
イベントストリーミング データベースなどのイベントソースからリアルタイムでデータをキャプチャするためのパターンです。イベントストリーミングの主な部分は次のとおりです。
- ブローカー :イベントの保存を担当するシステム
- トピック :イベントのカテゴリ
- プロデューサー :特定のトピックに関するイベントをブローカーに送信します
- 消費者 :イベントを読みます
- イベント :生産者が消費者に伝えたいデータ
パブリッシュおよびサブスクライブアーキテクチャパターン(pub / subパターン)について話すことは避けられません。 この時点で;イベントストリーミングはそのパターンの実装ですが、次の変更が加えられています:
- メッセージの代わりにイベントが発生します。
- イベントは、通常は時間順に並べられます。
- 消費者は、トピックの特定のポイントからイベントを読み取ることができます。
- イベントには一時的な耐久性があります。
フローは、プロデューサーが開始したときに始まります 新しいイベントを公開します トピックに (前に見たように、トピックは特定のタイプのイベントの分類にすぎません)。次に、消費者 特定のカテゴリのイベントに興味がある場合は、そのトピックを購読してください。最後に、ブローカー トピックの利用者を特定し、目的のイベントを利用できるようにします。
-
デカップリング 出版社と消費者はお互いを知る必要がないため、依存関係はありません。さらに、イベントはアクションを指定しないため、多くの消費者が同じイベントを取得して異なるアクションを実行する可能性があります。
-
低レイテンシ イベントは分離されており、消費者はいつでもそれらを利用できます。ミリ秒単位で発生する可能性があります。
-
独立 ご存知のように、パブリッシャーとコンシューマーは独立しているため、異なるチームが同じイベントを使用して他のアクションや目的で作業することができます。
-
フォールトトレランス 一部のイベントストリーミングプラットフォームは、消費者の失敗に対処するのに役立ちます。たとえば、消費者は自分の位置を保存して、エラーが発生した場合にそこからやり直すことができます。
-
リアルタイム処理 フィードバックはリアルタイムで受信されるため、ユーザーはイベントの応答を確認するために数分または数時間待つ必要はありません。
-
高性能 イベントプラットフォームは、待ち時間が短いため、多くのメッセージを処理できます。たとえば、1秒間に数千のイベントを処理できます。
-
監視 一部のイベントストリーミングツールには、完全な監視ツールがありません。彼らは、DatadogやNewRelicなどの追加のツールを実装することを求めています。
-
構成 一部のツールの構成は、経験豊富な人でも圧倒される可能性があります。多くのパラメータがあり、場合によっては、それらを実装するために主題について深く知る必要があります。
-
クライアントライブラリ Java以外の言語でKafkaを実装するのは簡単ではありません。場合によっては、クライアントライブラリが最新ではない、不安定である、または選択できる選択肢が多くないことがあります。
イベントストリーミングで最も人気のあるツールの1つは、 Apache Kafkaです。 。このツールを使用すると、ユーザーは必要なときにいつでもどこでもデータを送信、保存、および要求できます。それについて話しましょう。
Apache Kafka
「ApacheKafkaは、高性能データパイプライン、ストリーミング分析、データ統合、ミッションクリティカルなアプリケーションのために何千もの企業で使用されているオープンソースの分散イベントストリーミングプラットフォームです。」
Apache Kafkaは、リアルタイムのログ送信用に特別に設計されているため、以下を必要とするアプリケーションに最適です。
- 異なるコンポーネント間の信頼性の高いデータ交換
- アプリケーション要件の変化に応じてメッセージングワークロードを分割する機能
- データ処理のためのリアルタイム送信
RailsアプリケーションでKafkaを使用しましょう!
RubyでKafkaを使用する最も有名な宝石は、Zendeskによってruby-kafkaと呼ばれ、素晴らしいです!それでも、すべての実装を手動で行う必要があります。そのため、ruby-kafkaで構築された「フレームワーク」がいくつかあります。また、すべての構成と実行の手順を支援します。
Karafkaは、ApacheKafkaベースのRubyアプリケーション開発を簡素化するために使用されるフレームワークです。
Kafkaを使用するには、Javaをインストールする必要があります。 KafkaはScalaおよびJavaアプリケーションでもあるため、Zookeeperのインストールが必要になります。
インストールの前に、Zookeeperについて少し説明したいと思います。 Zookeeperは、Kafkaに不可欠な一元化されたサービスです。新しいトピックの作成、ブローカーのクラッシュ、ブローカーの削除、トピックの削除などの変更があった場合に通知を送信します。
その主なタスクは、Kafkaブローカーを管理し、それぞれのメタデータを含むリストを維持し、ヘルスチェックメカニズムを促進することです。さらに、トピックのさまざまなパーティションの主要なブローカーを選択するのに役立ちます。
MacOSの場合:
それでは、次のコマンドを使用してJavaとZookeeperをインストールしましょう。
brew install java
brew install zookeeper
その後、これを実行してKafkaのインストールを続行できます:
brew install kafka
KafkaとZookeeperをインストールしたら、次の方法でサービスを開始する必要があります。
brew services start zookeeper
brew services start kafka
WindowsおよびLinuxの場合:
手順:
- Javaのインストール
- Zookeeperをダウンロード
いつものように単純なRailsアプリケーションを作成するだけです:
rails new karafka_example
そして、Gemfile内にカラフカジェムを追加します:
gem 'karafka'
次に、bundle install
を実行します 最近追加されたgemをインストールするには、次のコマンドを実行してすべてのKarafkaのものを取得することを忘れないでください。
bundle exec karafka install
このコマンドは、いくつかの興味深いファイルを生成するはずです。最初のファイルはkarafka.rb
です。 ルートディレクトリのapp/consumers/application_consumer.rb
、およびapp/responders/application_responder.rb
。
karafka.rb
ファイルは、Rails構成から分離された初期化アプリケーションのようなものです。これにより、Karafkaアプリケーションを構成し、APIの点でRailsアプリケーションルートと同様のいくつかのルートを描画できます。しかし、ここでは、トピックと消費者向けです。
プロデューサー はイベントの作成を担当しており、イベントをapp/responders
に追加できます。 フォルダ。それでは、ユーザー向けの簡単なプロデューサーを作成しましょう。
# app/responders/users_responder.rb
class UsersResponder < ApplicationResponder
topic :users
def respond(event_payload)
respond_to :users, event_payload
end
end
消費者 プロデューサーから送信されたすべてのイベント/メッセージを読み取る責任があります。これは、受信したメッセージをログに記録する単なるコンシューマーです。
# app/consumers/users_consumer.rb
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params}"
end
end
params
を使用します イベントを取得します。ただし、イベントをバッチで読み取り、構成config.batch_fetching
がある場合 trueの場合、params_batch
を使用する必要があります 。
Karafkaサービス(イベントを聞くサービス)を実行するには、コンソールに移動し、新しいタブを開いて、Railsプロジェクトに移動し、次のコマンドを実行します。
bundle exec karafka server
次に、別のコンソールタブを開き、Railsプロジェクトに移動して、次のように入力します。
rails c
そこで、レスポンダーと一緒にイベントを作成しましょう:
> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })
Railsコンソールを確認すると、イベントの作成後に次のメッセージが表示されます。
Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}
また、Karafkaの[サービス]タブには、次のようなものが表示されます。
New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092
ただし、メッセージペイロードだけが必要な場合は、params.payload
を追加できます。 あなたの消費者にそしてあなたはこのようなものを持つでしょう:
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer
email
のようないくつかの属性を持つユーザーモデルを作成できます 、first_name
およびlast_name
次のコマンドを実行します:
rails g model User email first_name last_name
次に、これを使用して移行を実行できます:
rails db:migrate
次に、次のような検証を追加します。
class User < ApplicationRecord
validates :email, uniqueness: true
end
最後に、消費者を変えることができます:
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
それでは、同じメールアドレスで2つのイベントを作成しましょう:
UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )
UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )
これにより、最初のイベントがデータベースに作成されます:
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.1ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (9.6ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "1"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.0ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer
ただし、2番目のメールは失敗します。これは、メールが一意であるという検証があるためです。既存のメールで別のレコードを追加しようとすると、次のように表示されます。
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Exists? (0.3ms) SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2 [["email", "[email protected]"], ["LIMIT", 1]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (0.2ms) ROLLBACK
↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
最後の行にエラーが表示されますActiveRecord::RecordInvalid: Validation failed: Email has already been taken
。しかし、ここで興味深いのは、Kafkaがイベントを何度も処理しようとすることです。 Karafkaサーバーを再起動しても、最後のイベントを処理しようとします。カフカはどこから始めればよいのかをどうやって知るのですか?
コンソールが表示された場合、エラーの後に次のように表示されます:
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42
どのオフセットが処理されたかがわかります。この場合はオフセット42でした。したがって、Karafkaサービスを再起動すると、そのオフセットで開始されます。
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches
ユーザーモデルに電子メール検証があるため、それでも失敗します。この時点で、Karafkaサーバーを停止し、その検証を削除またはコメントして、サーバーを再起動します。イベントが正常に処理される方法がわかります:
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (3.8ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "2"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.5ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed
最後に、最後の行に次のメッセージが表示されます:Marking users/0:43 as processed
。
これはKarafkaが提供するクールなものです。コンシューマーでコールバックを使用できます。これを行うには、モジュールをインポートして使用するだけです。次に、UserConsumer
を開きます そしてこれを追加します:
class UsersConsumer < ApplicationConsumer
include Karafka::Consumers::Callbacks
before_poll do
Karafka.logger.info "*** Checking something new for #{topic.name}"
end
after_poll do
Karafka.logger.info '*** We just checked for new messages!'
end
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
ポーリングは、現在のパーティションオフセットに基づいてレコードをフェッチするための媒体です。したがって、これらのコールバックはbefore_poll
およびafter_poll
、その名前が示すように、その瞬間に実行されます。メッセージをログに記録しているだけで、Karafkaサーバーでメッセージを確認できます。1つはフェッチ前、もう1つはフェッチ後です。
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!
ハートビートは、消費者として、私たちが生きているとカフカに言う方法です。そうでなければ、カフカは消費者が死んでいると想定します。
Karafkaには、一定期間内にこれを行うためのデフォルトの設定があります。 kafka.heartbeat_interval
です デフォルトは10秒です。このハートビートはKarafkaサーバーで確認できます。
*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!
Sending heartbeat...
、カフカは私たちが生きていることを知っており、私たちはその消費者グループの有効なメンバーです。また、より多くのレコードを消費できます。
オフセットを消費済みとしてマークすることは、オフセットのコミットと呼ばれます。 Kafkaでは、offsetsトピックと呼ばれる内部Kafkaトピックに書き込むことにより、オフセットコミットを記録します。メッセージは、そのオフセットがオフセットトピックにコミットされた場合にのみ消費されたと見なされます。
Karafkaには、このコミットを毎回自動的に実行する構成があります。構成はkafka.offset_commit_interval
です。 、およびその値はデフォルトで10秒です。これにより、Karakfaは10秒ごとにオフセットコミットを実行し、Karafkaサーバーでそのメッセージを表示できます。
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!
Committing offsets: users/0:44
コミットしているオフセットを教えてください。私の場合、トピック0からオフセット番号44をコミットできることをKafkaに伝えました。このようにして、サービスで何かが発生した場合、Karafkaはそのオフセットからのイベントの処理を再開できます。
イベントストリーミングは、より高速になり、データをより有効に活用し、より優れたユーザーエクスペリエンスを設計するのに役立ちます。実際のところ、多くの企業がこのパターンを使用して、すべてのサービスを伝達し、さまざまなイベントにリアルタイムで対応できるようにしています。前に述べたように、Railsで使用できるKarafka以外の選択肢があります。あなたはすでに基本を持っています。今、それらを自由に試してみてください。
- https://kafka.apache.org/
- https://github.com/karafka/karafka
- https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern
-
RailsでのTailwindCSSの使用
CSSは魔法のようですが、時間がかかります。美しく、機能的で、アクセスしやすいサイトを使用するのは楽しいことですが、独自のCSSを作成するのは大変です。 Bootstrapなどの多くのCSSライブラリは近年爆発的に増加しており、Tailwindは2021年にパックをリードしています。 RailsにはTailwindが付属していませんが、この記事では、TailwindCSSを新しいRubyon Railsプロジェクトに追加する方法を説明します。これにより、設計の実装にかかる時間を節約できます。また、Tailwindのユーティリティクラスを使用した設計のウォークスルーも行います。このチュートリア
-
Rails5でのAngularの使用
あなたは前にその話を聞いたことがあります。分散型で完全に機能するバックエンドAPIと、通常のツールセットで作成されたフロントエンドで実行されているアプリケーションがすでにあります。 次に、Angularに移動します。または、AngularをRailsプロジェクトと統合する方法を探しているだけかもしれません。これは、この方法を好むためです。私たちはあなたを責めません。 このようなアプローチを使用すると、両方の世界を活用して、たとえばRailsとAngularのどちらの機能を使用してフォーマットするかを決定できます。 構築するもの 心配する必要はありません。このチュートリアルは、この目的のた