Upstash Kafka、Redis、Next.js を使用してリアルタイム チャット アプリを構築する
プロジェクトの説明
このブログ投稿では、ユーザーがメッセージ クライアントとチャット ルームを作成できるメッセージング アプリケーションを作成します。さらに、ユーザーは過去のメッセージにアクセスできるようになります。
プロジェクトは 2 ページで構成されます。最初のページはクライアント登録専用であり、一意の名前を持つ複数のクライアントを作成できます。

クライアントのユーザー名をクリックすると、その特定のユーザーに関連付けられたチャットルーム クライアントに移動します。

チャット アプリケーションのロジックは次のとおりです。
ユーザーはインデックス ページ上に複数のクライアントを作成し、それぞれに一意のユーザー名を付けることができます。クライアントのユーザー名をクリックすると、新しいタブが開き、一意のパスを持つ別のクライアントが表示されます。
各クライアントは、WebSocket 接続を介してメッセージ サーバーに接続されます。クライアント上で新しいメッセージが作成されると、そのクライアントに関連付けられたメッセージ サーバーに送信されます。
メッセージ サーバーはメッセージ トラフィックを処理します。クライアントが WebSocket 接続を通じてメッセージを送信すると、サーバーはそのメッセージを Kafka ブローカーに送信します。各メッセージ サーバーは NodeJS スレッドを実行して受信メッセージを処理します。メッセージが消費されると、既存の WebSocket 接続を通じてクライアントに送信されます。クライアント側で受信メッセージを処理するには、react-use-websocket を使用します。 ライブラリ。
アプリケーションは Upstash Redis を利用してメッセージ履歴を保存します。メッセージが Kafka に対して生成されると、そのメッセージは Redis データベースにも保存されます。新しいクライアントを作成すると、古いメッセージが Upstash Redis から取得され、チャット表示に表示されます。
アプリケーションの概要は次のとおりです。
注: この実装では、デモ目的で単一のメッセージ サーバーを作成します。メッセージの負荷を処理するためにサーバーの数を増やすこともできます。

デモ
ここからアプリのデモにアクセスできます。アプリケーションの現在のバージョンは Fly にデプロイされています。
はじめに
チャット アプリケーションを構築する手順は次のとおりです。
<オル>Upstash Redis データベースの作成
Upstash コンソールに移動してログインし、Redis にログインします。 タブで、[データベースの作成] をクリックします。 ボタン。

このようにして、Redis を使用する準備が整いました。資格情報を得るために Redis コンソールに戻ります。
Upstash Kafka クラスターの作成
ここでカフカに切り替えます。 Tab キーを押して、[クラスターの作成] をクリックします。 ボタン。クラスター名を入力して続行します。次に、Kafka トピックを作成して確認します。

次のアプリの作成
まず、ターミナルからアプリケーションのルート フォルダーを作成し、そこに移動します。 Next アプリとサーバーをこのフォルダーに保存します。
mkdir chat-app
cd chat-app 次に、次のアプリを作成します。
$ npx create-next-app@latest
✔ What is your project named? … next-chat-app
✔ Would you like to use TypeScript? … Yes
✔ Would you like to use ESLint? … Yes
✔ Would you like to use Tailwind CSS? … No
✔ Would you like to use `src/` directory? … No
✔ Would you like to use App Router? (recommended) … No
✔ Would you like to customize the default import alias? … No 認証情報の処理
.env という名前のファイルを作成します。 資格情報を保存します。認証情報を何度もコピーして貼り付ける必要はなく、このファイルからインポートするだけです。
まず、.env を作成します。 ファイル。
次に、Redis コンソールに移動し、UPSTASH_REDIS_REST_URL をコピーして貼り付けます。 と UPSTASH_REDIS_REST_TOKEN 資格情報を .env に設定します ファイル。

最後に、Kafka コンソールに切り替えて、UPSTASH_KAFKA_REST_URL を転送します。 、UPSTASH_KAFKA_REST_USERNAME 、UPSTASH_KAFKA_REST_PASSWORD

さて、あなたの .env ファイルは同様に見えるはずです
UPSTASH_REDIS_REST_URL=...
UPSTASH_REDIS_REST_TOKEN=...
UPSTASH_KAFKA_REST_URL=...
UPSTASH_KAFKA_REST_USERNAME=...
UPSTASH_KAFKA_REST_PASSWORD=... 認証情報の設定が完了したので、アプリケーションを続行します。
クライアント登録ページ
インデックス ページには、クライアントの登録/作成操作が含まれます。ユーザー名が送信されると、新しいクライアントが作成され、現在のクライアントの下にリストされます。 テーブル。
ページ/index.tsximport { useState } from "react";
import Link from "next/link";
import { Redis } from "@upstash/redis";
import styles from "@/styles/Home.module.css";
export default function Home() {
const [usernameInput, setUsernameInput] = useState<string>("");
const [usernameList, setUsernameList] = useState<string[]>(Array<string>);
const handleInputChange = (e: React.ChangeEvent<HTMLInputElement>): void => {
const inputValue: string = e.target.value;
setUsernameInput(inputValue);
};
const addUsernameClient = (e: React.FormEvent<HTMLFormElement>): void => {
e.preventDefault();
setUsernameList([...usernameList, usernameInput]);
setUsernameInput("");
};
return (
<div className={styles.container}>
<div className={styles.welcomeSection}>
<h1>Welcome to the demo message app!</h1>
<p>
This application uses Upstash Kafka for message passing, and Upstash
Redis for state management.
<br />
<br />
To get started, create several clients by typing in unique usernames to
the input section below and submitting.
<br />
<br />
The usernames will be added to the list of current clients. Click on a
username to open a new tab with that client's message display.
<br />
<br />
You can have multiple sessions open at once.
</p>
</div>
<form className={styles.formSection} onSubmit={addUsernameClient}>
<input
type="text"
className={styles.formInput}
value={usernameInput}
onChange={handleInputChange}
></input>
<button className={styles.formSubmit} type="submit">
Create the client!
</button>
</form>
<div className={styles.clientListSection}>
<p className={styles.clientListHeader}>Current Clients</p>
<div className={styles.clientList}>
{usernameList.map((username, i) => {
return (
<Link
href={`/user/${username}`}
key={`${i}`}
className={styles.userClient}
target={"_blank"}
>
<p>{username}</p>
</Link>
);
})}
</div>
</div>
</div>
);
} アプリをリロードするたびにチャット履歴をリセットしたい場合は、次の関数を使用できます:
ページ/index.tsxexport async function getServerSideProps() {
const redis = new Redis({
url: process.env.UPSTASH_REDIS_REST_URL,
token: process.env.UPSTASH_REDIS_REST_TOKEN,
});
await redis.del("messagesList");
return {
props: {},
};
}
これで、インデックス ページを実行する準備が整いました。 npm run dev を実行します next-chat-app のコマンド フォルダに移動して、インデックス ページが公開されることを確認します。
メッセージクライアントページ
クライアントの動的ルーティングを実装するには、/pages/user/[username].tsx という名前のフォルダーを作成します。 このフォルダ構造により、ユーザー名に基づいて個々のクライアントの動的ルートを作成できるようになります。
これがクライアントの主要コンポーネントです。このコンポーネントは、メッセージ、ユーザー名などのリストの状態を保持します。 useWebSocket フックを使用して、WebSocket からのメッセージ、接続、および切断イベントを作成します。メッセージ イベントが発行されると、メッセージがメッセージ リストに追加され、MessageDisplay コンポーネントが再レンダリングされます。
/pages/user/[ユーザー名].tsximport { useState } from "react";
import { useRouter } from "next/router";
import { Redis } from "@upstash/redis";
import useWebSocket from "react-use-websocket";
import styles from "@/styles/Home.module.css";
type Message = {
id: number;
sender: string;
text: string;
};
export default function MessageApp(props: { messagesData: Message[] }) {
const { messagesData } = props;
const { username } = useRouter().query;
const [inputText, setInputText] = useState<string>("");
const [messageList, setMessageList] = useState<Message[]>(messagesData);
const [messageCounter, setMessageCounter] = useState<number>(0);
const handleMessage = function (message: Message) {
const nextMessages = [...messageList, message];
setMessageList(nextMessages);
};
// handling WebSocket events
const { sendMessage } = useWebSocket("ws://localhost:8080", {
share: true,
filter: () => false,
onOpen: () => {
console.log("WebSocket connection!");
return "connection";
},
onMessage: (message) => {
const data = JSON.parse(message.data);
const { sender, text }: { sender: string; text: string } = data;
const messageData: Message = {
id: messageCounter,
sender: sender,
text: text,
};
setMessageCounter(messageCounter + 1);
handleMessage(messageData);
return message;
},
onClose: () => {
console.log("WebSocket disconnected!");
return "disconnected";
},
});
function handleSendMessage(messageText: string) {
const messageData = {
sender: username,
text: messageText,
};
sendMessage(JSON.stringify(messageData));
}
return (
<div className={styles.Container}>
<MessageDisplay messages={messageList} />
<MessageInput
inputText={inputText}
setInputText={setInputText}
handleSendMessage={handleSendMessage}
/>
</div>
);
} MessageDisplay コンポーネントと MessageInput コンポーネントは次のとおりです。
/pages/user/[ユーザー名].tsxconst MessageDisplay = function (props: { messages: Message[] }) {
const { messages } = props;
return (
<div className={styles.messageContainer}>
{messages.map((message) => (
<MessageBubble
key={message.id}
sender={message.sender}
text={message.text}
/>
))}
</div>
);
};
const MessageInput = (props: {
inputText: string;
setInputText: (msg: string) => void;
handleSendMessage: (msg: string) => void;
}) => {
const { inputText, setInputText, handleSendMessage } = props;
const handleInputChange = (
e: React.ChangeEvent<HTMLInputElement>
): void => {
const inputValue: string = e.target.value;
setInputText(inputValue);
};
const handleSubmit = (e: React.FormEvent<HTMLFormElement>): void => {
e.preventDefault();
handleSendMessage(inputText);
if (inputText.trim() !== "") {
setInputText(" ");
}
};
return (
<form className={styles.inputSection} onSubmit={handleSubmit}>
<input
className={styles.inputText}
type="text"
value={inputText}
onChange={handleInputChange}
></input>
<button className={styles.inputSendButton} type="submit">
Send
</button>
</form>
);
};
const MessageBubble = (props: {
sender: string;
text: string;
key: number;
}) => {
const { sender, text } = props;
const { username } = useRouter().query;
const isSender = sender === username;
const senderClass = isSender ? "sender" : "receiver";
return (
<div className={`${styles["messageBubble"]} ${styles[senderClass]}`}>
<div className={styles.messageSender}>
{isSender ? "You" : sender}
</div>
<div className={styles.messageText}>{text}</div>
</div>
);
};
チャット履歴をクライアントに提供するには、getServerSideProps() を使用します。 機能。
export async function getServerSideProps() {
const redis = new Redis({
url: process.env.UPSTASH_REDIS_REST_URL,
token: process.env.UPSTASH_REDIS_REST_TOKEN,
});
const messagesData = (await redis.lrange("messagesList", 0, -1)).reverse();
return {
props: {
messagesData,
},
};
} Next.js アプリが動作するようになりました。ページを更新し、クライアントを作成し、そのうちの 1 つに移動します。クライアントページが表示されます。ただし、メッセージ フローを処理するにはメッセージ サーバーが必要です。
メッセージ サーバーの作成
サーバーの構造はかなりシンプルです。これを機能させるために、Node.js、ws ライブラリ、および Upstash Kafka を使用します。まず、server を作成します。 chat-app folder 内のフォルダー .
mkdir server
cd server
server 内 フォルダーに必要なものをインストールし、ファイルを構成します。
npm install typescript ws tsc @upstash/kafka @types/ws
tsc --init
次に、WebSocket、KafkaProducer、および Kafka Consumer クライアントを /server/message_server.ts 内に作成します。 ファイル:
import * as http from "http";
import { Kafka } from "@upstash/kafka";
import { Redis } from "@upstash/redis";
import { WebSocket } from "ws";
const server = http.createServer();
const wss = new WebSocket.Server({ server });
server.listen(8080, () => {
console.log("Server is running on port 8080");
});
const kafka = new Kafka({
url: process.env.UPSTASH_KAFKA_REST_URL,
username: process.env.UPSTASH_KAFKA_REST_USERNAME,
password: process.env.UPSTASH_KAFKA_REST_PASSWORD,
});
const redis = new Redis({
url: process.env.UPSTASH_REDIS_REST_URL,
token: process.env.UPSTASH_REDIS_REST_TOKEN,
});
const consumer = kafka.consumer();
const producer = kafka.producer();
const clients = new Set<WebSocket>();
WebSocket と対話するために、connection を作成しています。 と message イベント。
wss.on("connection", async (connection, req) => {
clients.add(connection);
console.log(`New client connected!`);
connection.on("message", async (message) => {
const jsonMessage = message.toString();
console.log("Received message:", JSON.parse(jsonMessage));
producer.produce("chat", jsonMessage);
});
connection.on("close", () => {
console.log(`Client disconnected:`);
clients.delete(connection);
});
}); 最後に、事前定義された間隔でメッセージを消費するスレッドを作成して実行します。
/server/message_server.tsasync function run() {
while (true) {
const messages = await consumer.consume({
consumerGroupId: "group_1",
instanceId: "instance_1",
topics: ["chat"],
autoOffsetReset: "earliest",
});
if (messages.length != 0) {
for (let i = 0; i < messages.length; i++) {
await redis.lpush("messagesList", messages[i].value);
console.log(`Message sending: ${messages[i].value}`);
clients.forEach((connection: WebSocket) => {
connection.send(messages[i].value);
});
}
}
console.log("Run!");
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
すべての準備が整いました。私たちのアプリは今のところ魅力的に動作しているはずです。メッセージ サーバーをローカルで実行し、クライアント ページを更新すると、クライアント間で送信されたメッセージを確認できます。以下のコマンドは TS ファイルをトランスコンパイルし、localhost:8000 でサーバーを実行します。
tsc message_server.ts
node message_server.js 展開
デプロイメントには Fly.io を使用します。まだアカウントをお持ちでない場合は、開始する前にアカウントを作成してください。
メッセージ サーバーの展開
server に移動します。 フォルダーに移動し、flyctl をインストールします。 CLI ツール、シェル経由で承認
npm install flyctl
flyctl auth login
構成ファイルを作成するには、flyctl init を実行します。 。これにより、fly.toml が作成されます 。 fly.toml に移動します そして、WebSocket 接続設定用に次の行を挿入します。
[[services]]
internal_port = 8080
protocol = "tcp"
[services.concurrency]
hard_limit = 25
soft_limit = 20
[[services.ports]]
handlers = ["http"]
port = "80"
[[services.ports]]
handlers = ["tls", "http"]
port = "443"
[[services.tcp_checks]]
interval = 10000
timeout = 2000
さて、サーバーの最後のステップです。 flyctl deploy を実行します 、準備は完了です。デプロイメントプロセスが完了すると、flyctl はサーバーにエンドポイントを提供します。そのエンドポイントをコピーしてください。この場合、エンドポイントは message-server.fly.dev です。 .
次のアプリケーションのデプロイ
Next.js アプリケーションをデプロイする前に、メッセージ サーバーのデプロイ エンドポイントを埋め込む必要があります。 pages/user/[username].tsx 内の WebSocket URL を置き換えてください。 ws://localhost:8080 からのファイル flyctl からエンドポイントへ 、wss:// と組み合わせます。 接頭語。この場合、それは wss://message-server.fly.dev です。 .
次に、next-chat-app で フォルダーで、server と同じコマンドを実行します。 。今回は、fly.toml を編集する必要はありません。 ファイルがあるので、その手順なしで続行できます。
flyctl init
flyctl deploy
もう終わりです! flyctl open を実行すると、 コマンドを実行すると、デプロイされたプロジェクトに移動します。
結論と提案
フォローしていただきありがとうございます!
プロジェクトの Github リポジトリはここにあります。
プロジェクトの作業を続けたい場合は、次のような提案があります:
-
現在、ページがリロードされるたびに、Upstash Redis に保存されているすべてのメッセージがフラッシュされます。この動作は、
pages/index.tsxのコードによって制御されます。 ファイル、特にgetServerSideProps内 機能。ただし、ユーザーがページをリロードしようとすると、チャットルームのすべての参加者のチャット履歴が削除されるという重大な問題が発生します。
この問題を解決するには、メッセージが送信されるたびにチャット履歴の TTL の拡張を実装することが推奨されます。この改善により、ページがリロードされた後でもチャット履歴にアクセスでき、保存されることが保証されます。 -
複数のチャットルーム機能を実装できます。これを実現するには、チャットルームごとに一意の名前を持つ複数の Kafka トピックを作成します。もう 1 つの方法は、適切なデータ構造を使用してメッセージ サーバー自体でメッセージを処理することです。
-
また、複数のメッセージ サーバーとロード バランサを実装して、システム設計のベスト プラクティスを適用することもできます。
ご質問がございましたら、fahreddin@upstash.com までご連絡ください。
-
負荷分散セットアップで Redis キャッシュを使用した Azure App Service での効率的なセッション管理
従来のロード バランシング環境でのセッション 一般に、すべての Web アプリケーションではインメモリ セッション (RAM に保存されたデータ) を使用します。これは、専用 VM または共有ホスティング プランでアプリケーションをホストする従来のホスティング環境のほとんどでうまく機能します。 ただし、トラフィックが増加した場合は、複数の Web サーバーを作成し、ロード バランサーを使用してトラフィックを制御することで負荷分散を計画します。これらのシナリオでは、リクエスト (単一セッションに関連する) が複数のサーバーによって処理されるため、セッションは機能しません (同じサーバーで単一
-
エッジキャッシングを使用した5ミリ秒のグローバルRedisレイテンシ
データベースとクライアントが同じリージョンにある場合、Redisを使用すると1ミリ秒のレイテンシーが簡単になります。ただし、クライアントをグローバルに分散させたい場合は、遅延が100ミリ秒を超えて増加します。これを克服するためにEdgeCachingを構築しました。 エッジキャッシング エッジキャッシングを使用すると、REST応答は、CDNと同様に、世界中のエッジロケーションにキャッシュされます。エッジキャッシングが有効になっている場合、平均で5msのグローバルレイテンシが見られます。 10の異なるリージョンにあるクライアントからのレイテンシー数を記録するベンチマークアプリケーションを参照し