Redis
 Computer >> コンピューター >  >> プログラミング >> Redis

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド

スティーブン・サンウォ著

動作するフルスタック アプリケーションを構築するには、考慮すべき可動部分が非常に多くあります。また、アプリの成功には重要な多くの決定を下す必要があります。

たとえば、どの言語を使用し、どのプラットフォームに展開しますか?コンテナ化されたソフトウェアをサーバーにデプロイするつもりですか、それともバックエンドを処理するためにサーバーレス機能を利用しますか?認証や支払いなど、アプリケーションの複雑な部分を処理するためにサードパーティ API を使用する予定ですか?データはどこに保存しますか?

これらすべてに加えて、アプリケーションのユーザー インターフェイス、デザイン、使いやすさなどについても考慮する必要があります。

このため、複雑で大規模なアプリケーションでは、多機能の開発チームが協力してアプリを構築する必要があります。

フルスタック アプリケーションの開発方法を学ぶ最良の方法の 1 つは、エンドツーエンドの開発プロセスをカバーするプロジェクトを構築することです。アーキテクチャの設計、API サービスの開発、ユーザー インターフェイスの開発を経て、最後にアプリケーションをデプロイします。

したがって、このチュートリアルでは、これらの概念を詳しく学ぶのに役立つ AI チャットボットの構築プロセスを説明します。

私たちが取り上げるトピックには次のようなものがあります。

  • Python、FastAPI、WebSocket を使用して API を構築する方法
  • Redis を使用してリアルタイム システムを構築する方法
  • React を使用してチャット ユーザー インターフェイスを構築する方法

重要な注意: これは、Python と JavaScript の基本的な知識を必要とする中級のフルスタック ソフトウェア開発プロジェクトです。

アプリケーション全体をコーディングしたくない場合でも、重要なフェーズを簡単に選択できるように、プロジェクトを慎重にセクションに分割しました。

ここの My Github で完全なリポジトリをダウンロードできます。

目次

セクション 1

  • アプリケーション アーキテクチャ
  • 開発環境をセットアップする方法

    セクション 2

  • Python、FastAPI、WebSocket を使用してチャット サーバーを構築する方法
    • Python 環境をセットアップする方法
    • FastAPI サーバーのセットアップ
    • API にルートを追加する方法
    • UUID を使用してチャット セッション トークンを生成する方法
    • Postman を使用して API をテストする方法
    • Websocket と接続マネージャー
    • FastAPI での依存関係の挿入

      セクション 3

  • Redis を使用してリアルタイム システムを構築する方法
    • Redis と分散メッセージング キュー
    • Redis クライアントを使用して Python で Redis クラスターに接続する方法
    • Redis ストリームの操作方法
    • チャット データをモデル化する方法
    • Redis JSON の操作方法
    • トークンの依存関係を更新する方法

      セクション 4

  • AI モデルを使用してチャットボットにインテリジェンスを追加する方法
    • Huggingface を始める方法
    • 言語モデルを操作する方法
    • AI モデルの短期記憶をシミュレートする方法
    • ストリーム コンシューマとメッセージ キューからの Real-timeDdata プル
    • AI 応答でチャット クライアントを更新する方法
    • リフレッシュトークン
    • Postman で複数のクライアントとのチャットをテストする方法

アプリケーションのアーキテクチャ

ソリューション アーキテクチャをスケッチすると、アプリケーション、使用する予定のツール、コンポーネントが相互に通信する方法の概要がわかります。

私は、draw.io を使用して、以下の簡単なアーキテクチャを作成しました。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド フルスタック チャットボット アーキテクチャ

アーキテクチャのさまざまな部分をさらに詳しく見てみましょう。

クライアント/ユーザー インターフェース

React バージョン 18 を使用してユーザー インターフェイスを構築します。チャット UI は WebSocket 経由でバックエンドと通信します。

GPT-J-6B とハギングフェイス推論 API

GPT-J-6B は、60 億個のパラメーターでトレーニングされた生成言語モデルで、一部のタスクでは OpenAI の GPT-3 と緊密に動作します。

GPT-J-6B はオープンソース モデルであり、単純なユースケースでは有料トークンを必要としないため、GPT-J-6B を使用することにしました。

Huggingface は、このモデルに接続するためのオンデマンド API もほぼ無料で提供します。 GPT-J-6B とハグ顔推論 API について詳しくは、こちらをご覧ください。

レディス

GPT にプロンプトを送信する場合、プロンプトを保存し、応答を簡単に取得する方法が必要です。 Redis JSON を使用してチャット データを保存し、Huggingface Inference API とのリアルタイム通信を処理するために Redis Streams も使用します。

Redis は、JSON のようなデータの超高速な取得と保存を可能にするインメモリ キー/値ストアです。このチュートリアルでは、テスト目的で Redis Enterprise が提供する管理された無料の Redis ストレージを使用します。

Web ソケットとチャット API

クライアントとサーバー間でメッセージをリアルタイムで送信するには、ソケット接続を開く必要があります。これは、HTTP 接続だけではクライアントとサーバー間のリアルタイムの双方向通信を保証できないためです。

FastAPI は高速で最新の Python サーバーを提供するため、チャット サーバーには FastAPI を使用します。 WebSocket の詳細については、FastAPI ドキュメントを参照してください。

開発環境のセットアップ方法

このアプリの構築には、任意の OS を使用できます。現在、MacOS と Visual Studio Code を使用しています。 Python と NodeJs がインストールされていることを確認してください。

プロジェクト構造を設定するには、fullstack-ai-chatbot という名前のフォルダーを作成します。 。次に、プロジェクト内に client という名前の 2 つのフォルダーを作成します。 および server 。サーバーはバックエンドのコードを保持し、クライアントはフロントエンドのコードを保持します。

次に、プロジェクト ディレクトリ内で、「git init」コマンドを使用してプロジェクト フォルダーのルート内にある Git リポジトリを初期化します。次に、「touch .gitignore」を使用して .gitignore ファイルを作成します。

git init
touch .gitignore

次のセクションでは、FastAPI と Python を使用してチャット Web サーバーを構築します。

Python、FastAPI、WebSocket を使用してチャット サーバーを構築する方法

このセクションでは、FastAPI を使用してユーザーと通信するチャット サーバーを構築します。 WebSocket を使用してクライアントとサーバー間の双方向通信を確保し、リアルタイムでユーザーに応答を送信できるようにします。

Python 環境をセットアップする方法

サーバーを起動するには、Python 環境をセットアップする必要があります。 VS Code 内のプロジェクト フォルダーを開き、ターミナルを開きます。

プロジェクトのルートからサーバー ディレクトリに移動し、python3.8 -m venv env を実行します。 。これにより仮想環境が作成されます。 Python プロジェクトの場合、env という名前になります。 。仮想環境をアクティブにするには、source env/bin/activate を実行します。

次に、Python 環境にいくつかのライブラリをインストールします。

pip install fastapi uuid uvicorn gunicorn WebSockets python-dotenv aioredis

次に、touch .env を実行して環境ファイルを作成します。 ターミナル内で。 .env 内でアプリ変数とシークレット変数を定義します。 ファイル。

アプリ環境変数を追加し、次のように「開発」に設定します:export APP_ENV=development 。次に、FastAPI サーバーを使用して開発サーバーをセットアップします。

FastAPI サーバーのセットアップ

サーバー ディレクトリのルートに、main.py という名前の新しいファイルを作成します。 次に、開発サーバー用に以下のコードを貼り付けます。

from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
 return {"msg": "API is Online"}
if __name__ == "__main__":
 if os.environ.get('APP_ENV') == "development":
 uvicorn.run("main:api", host="0.0.0.0", port=3500,
 workers=4, reload=True)
 else:
 pass

まず import FastAPI api として初期化します。 。次に import load_dotenv python-dotenv から ライブラリを作成し、それを初期化して .env から変数をロードします。 ファイル、

次に、API をテストするための簡単なテスト ルートを作成します。テスト ルートは、API がオンラインであることを示す単純な JSON 応答を返します。

最後に、uvicorn.run を使用して開発サーバーをセットアップします。 そして必要な引数を提供します。 API はポート 3500 で実行されます。 .

最後に、ターミナルで python main.py を使用してサーバーを実行します。 。 Application startup complete が表示されたら ターミナルで、ブラウザで URL http://localhost:3500/test に移動すると、次のような Web ページが表示されるはずです。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド API テスト ページ

API にルートを追加する方法

このセクションでは、API にルートを追加します。 src という名前の新しいフォルダーを作成します。 。これは、すべての API コードが存在するディレクトリです。

routes という名前のサブフォルダーを作成します。 、 cd でフォルダーに移動し、chat.py という名前の新しいファイルを作成します。 次に、以下のコードを追加します。

import os
from fastapi import APIRouter, FastAPI, WebSocket, Request
chat = APIRouter()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(request: Request):
 return None
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
 return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket = WebSocket):
 return None

3 つのエンドポイントを作成しました。

  • /token チャット セッションにアクセスするためのセッション トークンをユーザーに発行します。チャット アプリは一般に公開されるため、認証について心配する必要はなく、シンプルに保ちたいと考えていますが、それでも各固有のユーザー セッションを識別する方法が必要です。
  • /refresh_token 接続が失われた場合でも、トークンがまだアクティブで有効期限が切れていない限り、ユーザーのセッション履歴を取得します。
  • /chat WebSocket を開いてクライアントとサーバー間でメッセージを送信します。

次に、チャット ルートをメイン API に接続します。まず、import chat from src.chat を行う必要があります。 main.py 内で ファイル。次に、文字通り include_router を呼び出してルーターを組み込みます。 初期化された FastAPI のメソッド クラスを指定し、引数としてチャットを渡します。

api.py を更新してください 以下に示すコード:

from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from routes.chat import chat
load_dotenv()
api = FastAPI()
api.include_router(chat)
@api.get("/test")
async def root():
 return {"msg": "API is Online"}
if __name__ == "__main__":
 if os.environ.get('APP_ENV') == "development":
 uvicorn.run("main:api", host="0.0.0.0", port=3500,
 workers=4, reload=True)
 else:
 pass

UUID を使用してチャット セッション トークンを生成する方法

ユーザー トークンを生成するには、uuid4 を使用します。 チャットエンドポイントの動的ルートを作成します。これは公開されているエンドポイントであるため、JWT と認証について詳しく説明する必要はありません。

uuid をインストールしなかった場合 最初に、pip install uuid を実行します。 。次に、chat.py で UUID をインポートし、/token を更新します。 以下のコードを使用してルートします。


from fastapi import APIRouter, FastAPI, WebSocket, Request, BackgroundTasks, HTTPException
import uuid
# @route POST /token
# @desc Route generating chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 token = str(uuid.uuid4())
 data = {"name": name, "token": token}
 return data

上記のコードでは、クライアントは必須の名前を指定します。簡単なチェックを行って名前フィールドが空でないことを確認し、uuid4 を使用してトークンを生成します。

セッション データは、名前とトークンの単純な辞書です。最終的には、このセッション データを永続化し、タイムアウトを設定する必要がありますが、今のところはクライアントに返すだけです。

Postman を使用して API をテストする方法

WebSocket エンドポイントをテストするため、これを可能にする Postman などのツールを使用する必要があります (FastAPI のデフォルトの Swagger ドキュメントは WebSocket をサポートしていないため)。

Postman で、開発環境用のコレクションを作成し、POST リクエストを localhost:3500/token に送信します。 名前をクエリ パラメーターとして指定し、値を渡します。以下に示すような応答が返されるはずです。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド トークン ジェネレーターの郵便配達員

Websocket と接続マネージャー

src ルートに、socket という名前の新しいフォルダーを作成します。 connection.py という名前のファイルを追加します。 。このファイルでは、WebSocket への接続を制御するクラスと、接続と切断を行うすべてのヘルパー メソッドを定義します。

connection.py で 以下のコードを追加します。


from fastapi import WebSocket
class ConnectionManager:
 def __init__(self):
 self.active_connections: List[WebSocket] = []
 async def connect(self, websocket: WebSocket):
 await websocket.accept()
 self.active_connections.append(websocket)
 def disconnect(self, websocket: WebSocket):
 self.active_connections.remove(websocket)
 async def send_personal_message(self, message: str, websocket: WebSocket):
 await websocket.send_text(message)

ConnectionManager クラスは active_connections で初期化されます アクティブな接続のリストである属性。

次に、非同期 connect メソッドは WebSocket を受け入れます disconnect をアクティブな接続のリストに追加します。 メソッドは Websocket を削除します アクティブな接続のリストから。

最後に、send_personal_message メソッドはメッセージと Websocket を受け取ります。 メッセージを に送信し、メッセージを非同期に送信したいと考えています。

WebSocket は非常に幅広いトピックなので、ここでは表面だけを取り上げました。ただし、複数の接続を作成し、それらの接続へのメッセージを非同期的に処理するには、これで十分です。

FastAPI Websocket とソケット プログラミングについて詳しくは、こちらをご覧ください。

ConnectionManager を使用するには 、src.routes.chat.py 内でインポートして初期化します。 、/chat を更新します。 以下のコードを使用した WebSocket ルート:

from ..socket.connection import ConnectionManager
manager = ConnectionManager()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
 await manager.connect(websocket)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

websocket_endpoint 内 WebSocket を受け取る関数では、新しい WebSocket を接続マネージャーに追加し、while True を実行します。 ループを使用して、ソケットが開いたままであることを確認します。ソケットが切断された場合を除きます。

接続が開いている間、クライアントから websocket.receive_test() で送信されたメッセージを受信します。 とりあえず端末に出力してみます。

次に、当面はハードコードされた応答をクライアントに送り返します。最終的に、クライアントから受信したメッセージは AI モデルに送信され、クライアントに返される応答が AI モデルからの応答になります。

Postman では、新しい WebSocket リクエストを作成し、WebSocket エンドポイント localhost:3500/chat に接続することで、このエンドポイントをテストできます。 。

「接続」をクリックすると、「メッセージ」ペインに、API クライアントが URL に接続されており、ソケットが開いていることが表示されます。

これをテストするには、メッセージ「Hello Bot」をチャット サーバーに送信すると、以下に示すように、「応答:GPT サービスからの応答をシミュレートしています」というテスト応答が即座に返されるはずです。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド 郵便配達員のチャット テスト

FastAPI での依存関係の注入

2 つの異なるクライアント セッションを区別し、チャット セッションを制限できるようにするために、WebSocket 接続にクエリ パラメーターとして渡される時限トークンを使用します。

ソケット フォルダーに、utils.py という名前のファイルを作成します。 次に、以下のコードを追加します。

from fastapi import WebSocket, status, Query
from typing import Optional
async def get_token(
 websocket: WebSocket,
 token: Optional[str] = Query(None),
):
 if token is None or token == "":
 await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
 return token

get_token 関数は WebSocket とトークンを受け取り、トークンが None または null であるかどうかを確認します。

この場合、関数はポリシー違反ステータスを返し、利用可能な場合はトークンのみを返します。最終的には、後でトークン検証を追加してこの関数を拡張します。

この関数を使用するには、それを /chat に挿入します。 ルート。 FastAPI は依存関係を簡単に注入するための depends クラスを提供するため、デコレータをいじる必要はありません。

/chat を更新します。 次へのルート:

from ..socket.utils import get_token
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

/chat に接続しようとすると Postman のエンドポイントを使用すると、403 エラーが発生します。現時点では、クエリ パラメーターとしてトークンを指定し、そのトークンに任意の値を指定します。その後、以前と同様に接続できるはずですが、接続にトークンが必要になるだけです。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド トークンを使用したポストマン チャット テスト

ここまで到達できておめでとうございます!あなたの chat.py ファイルは次のようになります:

import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
chat = APIRouter()
manager = ConnectionManager()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
 token = str(uuid.uuid4())
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 data = {"name": name, "token": token}
 return data
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
 return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

このチュートリアルの次のパートでは、アプリケーションの状態の処理と、クライアントとサーバーの間でのデータの受け渡しに焦点を当てます。

Redis を使用してリアルタイム システムを構築する方法

現在、私たちのアプリケーションは状態を保存しておらず、ユーザーを識別したり、チャット データを保存および取得したりする方法はありません。また、チャット セッション中にハードコーディングされた応答をクライアントに返します。

チュートリアルのこの部分では、次のことについて説明します。

  • Redis クラスタに接続する方法 Python でRedis クライアントをセットアップします。
  • Redis JSON を使用してデータを保存および取得する方法
  • Redis ストリームの設定方法 Web サーバーとワーカー環境の間のメッセージ キューとして

Redis と分散メッセージング キュー

Redis は、データベース、キャッシュ、メッセージ ブローカー、ストリーミング エンジンとして使用できるオープン ソースのメモリ内データ ストアです。多数のデータ構造をサポートしており、リアルタイム機能を備えた分散アプリケーションに最適なソリューションです。

Redis エンタープライズ クラウド は、Redis が提供するフルマネージドのクラウド サービスで、インフラストラクチャを気にせずに Redis クラスターを無限のスケールでデプロイするのに役立ちます。

このチュートリアルでは、無料の Redis Enterprise Cloud インスタンスを使用します。こちらから Redis Cloud を無料で使い始めることができ、このチュートリアルに従って Redis データベースと Redis Insight (Redis と対話するための GUI) をセットアップできます。

Redis データベースを設定したら、プロジェクト ルート (サーバー フォルダーの外) に worker という名前の新しいフォルダーを作成します。 。

ワーカー環境を Web サーバーから分離することで、クライアントが WebSocket にメッセージを送信するときに、Web サーバーがサードパーティ サービスへのリクエストを処理する必要がなくなります。また、他のユーザーのためにリソースを解放することもできます。

推論 API とのバックグラウンド通信は、Redis を介してこのワーカー サービスによって処理されます。

接続されているすべてのクライアントからのリクエストはメッセージ キュー (プロデューサー) に追加され、ワーカーはメッセージを消費し、リクエストを推論 API に送信し、応答を応答キューに追加します。

API は応答を受信すると、それをクライアントに送り返します。

プロデューサとコンシューマの間の移動中に、クライアントは複数のメッセージを送信でき、これらのメッセージはキューに入れられ、順番に応答されます。

理想的には、このワーカーをまったく別のサーバー上で独自の環境で実行できますが、今のところはローカル マシン上に独自の Python 環境を作成します。

あなたは疑問に思っているかもしれません – なぜ労働者が必要なのでしょうか? Web サーバーがサードパーティ サービスへのリクエストも作成するシナリオを想像してください。これは、ソケット接続中にサードパーティのサービスからの応答を待機している間、サーバーがブロックされ、API から応答が取得されるまでリソースが拘束されることを意味します。

ランダム スリープ time.sleep(10) を作成してこれを試すことができます。 ハードコードされた応答を送信する前と、新しいメッセージを送信する前に。次に、新しいポストマン セッションで別のトークンを使用して接続を試みます。

ランダム スリープがタイムアウトするまで、チャット セッションが接続されないことがわかります。

より運用に重点を置いたサーバー セットアップでは非同期技術とワーカー プールを使用できますが、同時ユーザー数が増えるとそれだけでは十分ではなくなります。

最終的には、Redis を使用してチャット API とサードパーティ API の間の通信を仲介することで、ウェブ サーバー リソースの拘束を回避したいと考えています。

次に、新しいターミナルを開き、ワーカー フォルダーに移動し、パート 1 で行ったのと同様の新しい Python 仮想環境を作成してアクティブにします。

次に、次の依存関係をインストールします。

pip install aiohttp aioredis python-dotenv

Redis クライアントを使用して Python で Redis クラスターに接続する方法

aioredis クライアントを使用して Redis データベースに接続します。また、リクエスト ライブラリを使用して、Huggingface 推論 API にリクエストを送信します。

2 つのファイル .env を作成します 、および main.py 。次に、src という名前のフォルダーを作成します。 。また、redis という名前のフォルダーを作成します。 config.py という名前の新しいファイルを追加します。 。

.env 内 ファイルに次のコードを追加し、Redis クラスターで提供された認証情報でフィールドを更新していることを確認してください。

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>

config.py に以下の Redis クラスを追加します。

import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection

Redis オブジェクトを作成し、環境変数から必要なパラメーターを初期化します。次に、非同期メソッド create_connection を作成します。 Redis 接続を作成し、aioredis から取得した接続プールを返します。 メソッド from_url .

次に、以下のコードを実行して、main.py で Redis 接続をテストします。これにより、新しい Redis 接続プールが作成され、単純なキー「key」が設定され、それに文字列「value」が割り当てられます。


from src.redis.config import Redis
import asyncio
async def main():
 redis = Redis()
 redis = await redis.create_connection()
 print(redis)
 await redis.set("key", "value")
if __name__ == "__main__":
 asyncio.run(main())

次に、Redis Insight を開きます (チュートリアルに従ってダウンロードしてインストールした場合) 次のような内容が表示されるはずです。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド Redis インサイト テスト

Redis ストリームの操作方法

ワーカー環境のセットアップが完了したので、Web サーバー上にプロデューサーを作成し、ワーカー上にコンシューマを作成できます。

まず、サーバー上に Redis クラスを再度作成しましょう。 server.srcredis という名前のフォルダーを作成します 2 つのファイル config.py を追加します。 と producer.py

config.py で 、ワーカー環境の場合と同様に、以下のコードを追加します。

import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection

.env ファイルに、Redis 認証情報も追加します。

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>

最後に、server.src.redis.producer.py で 次のコードを追加します。


from .config import Redis
class Producer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def add_to_stream(self, data: dict, stream_channel):
 try:
 msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
 print(f"Message id {msg_id} added to {stream_channel} stream")
 return msg_id
 except Exception as e:
 print(f"Error sending msg to stream => {e}")

Redis クライアントで初期化されるプロデューサー クラスを作成しました。このクライアントを使用して、add_to_stream でストリームにデータを追加します。 データと Redis チャネル名を取得するメソッド。

ストリーム チャネルにデータを追加する Redis コマンドは xadd です。 また、aioredis には高レベル関数と低レベル関数の両方があります。

次に、新しく作成したプロデューサーを実行するために、chat.py を更新します。 および WebSocket /chat 以下のようなエンドポイント。更新されたチャンネル名 message_channel に注目してください。 .


from ..redis.producer import Producer
from ..redis.config import Redis
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 redis_client = await redis.create_connection()
 producer = Producer(redis_client)
 try:
 while True:
 data = await websocket.receive_text()
 print(data)
 stream_data = {}
 stream_data[token] = data
 await producer.add_to_stream(stream_data, "message_channel")
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

次に、Postman で接続を作成し、Hello というメッセージを任意の数送信します。 。以下のように、ストリーム メッセージが端末に出力されるはずです。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド ターミナル チャネル メッセージ テスト

Redis Insight に、新しい mesage_channel が表示されます。 作成され、クライアントから送信されたメッセージで満たされたタイムスタンプ付きのキューが作成されます。このタイムスタンプ付きのキューは、メッセージの順序を保持するために重要です。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド Redis インサイト チャネル

チャット データをモデル化する方法

次に、チャット メッセージのモデルを作成します。 WebSocket 経由でテキスト データを送信していることを思い出してください。しかし、チャット データにはテキストだけではなく、より多くの情報を保持する必要があります。チャットが送信されたときにタイムスタンプを付け、各メッセージの ID を作成し、チャット セッションに関するデータを収集して、このデータを JSON 形式で保存する必要があります。

この JSON データを Redis に保存できるため、WebSocket は状態を保存しないため、接続が失われたときにチャット履歴が失われることはありません。

server.srcschema という名前の新しいフォルダーを作成します 。次に、chat.py という名前のファイルを作成します。 server.src.schema で 次のコードを追加します。

from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
 id = uuid.uuid4()
 msg: str
 timestamp = str(datetime.now())
class Chat(BaseModel):
 token: str
 messages: List[Message]
 name: str
 session_start = str(datetime.now())

Pydantic の BaseModel を使用しています。 チャットデータをモデル化するクラス。 Chat クラスは、単一のチャット セッションに関するデータを保持します。トークン、ユーザー名、および datetime.now() を使用してチャット セッション開始時刻の自動生成されたタイムスタンプを保存します。 。

このチャット セッション内で送受信されたメッセージは、Message で保存されます。 uuid4 を使用してオンザフライでチャット ID を作成するクラス 。この Message を初期化するときに提供する必要がある唯一のデータ class はメッセージ テキストです。

Redis JSON の使用方法

Redis JSON のチャット履歴を保存する機能を使用するには、Redis labs が提供する rejson をインストールする必要があります。

ターミナルで server に cd します。 pip install rejson で rejson をインストールします 。次に、Redis を更新します。 server.src.redis.config.py のクラス create_rejson_connection を含めるには メソッド:


import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 self.REDIS_HOST = os.environ['REDIS_HOST']
 self.REDIS_PORT = os.environ['REDIS_PORT']
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection
 def create_rejson_connection(self):
 self.redisJson = Client(host=self.REDIS_HOST,
 port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
 return self.redisJson

create_rejson_connection を追加しています。 rejson Client を使用して Redis に接続するメソッド 。これにより、aioredis では利用できない、Redis で JSON データを作成および操作するメソッドが提供されます。

次に、server.src.routes.chat.py/token を更新できます 新しい Chat を作成するためのエンドポイント インスタンスを作成し、次のようにセッション データを Redis JSON に保存します。

@chat.post("/token")
async def token_generator(name: str, request: Request):
 token = str(uuid.uuid4())
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 # Create new chat session
 json_client = redis.create_rejson_connection()
 chat_session = Chat(
 token=token,
 messages=[],
 name=name
 )
 # Store chat session in redis JSON with the token as key
 json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
 # Set a timeout for redis data
 redis_client = await redis.create_connection()
 await redis_client.expire(str(token), 3600)
 return chat_session.dict()

注:これはデモ アプリであるため、チャット データを Redis に長期間保存したくありません。そこで、aioredis クライアントを使用してトークンに 60 分のタイムアウトを追加しました (rejson はタイムアウトを実装していません)。これは、60 分後にチャット セッション データが失われることを意味します。

これが必要なのは、ユーザーを認証しておらず、定義された期間後にチャット データをダンプしたいためです。このステップはオプションであり、含める必要はありません。

次に、Postman で、新しいトークンを作成するために POST リクエストを送信すると、以下のような構造化された応答が返されます。また、Redis Insight をチェックして、トークンを JSON キーとして、データを値として保存されたチャット データを確認することもできます。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド トークン ジェネレーターが更新されました

トークンの依存関係を更新する方法

トークンが生成され、保存されたので、get_token を更新する良い機会です。 /chat の依存関係 ウェブソケット。これは、チャット セッションを開始する前に有効なトークンを確認するために行われます。

server.src.socket.utils.pyget_token を更新します Redis インスタンスにトークンが存在するかどうかを確認する関数。存在する場合はトークンを返します。これは、ソケット接続が有効であることを意味します。存在しない場合は、接続を閉じます。

/token によって作成されたトークン 60分後には消滅します。したがって、チャットを開始しようとしているときにエラー応答が生成された場合に、ユーザーをリダイレクトして新しいトークンを生成するためのシンプルなロジックをフロントエンドに設けることができます。


from ..redis.config import Redis
async def get_token(
 websocket: WebSocket,
 token: Optional[str] = Query(None),
):
 if token is None or token == "":
 await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
 redis_client = await redis.create_connection()
 isexists = await redis_client.exists(token)
 if isexists == 1:
 return token
 else:
 await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")

依存関係をテストするには、使用したランダム トークンを使用してチャット セッションに接続すると、403 エラーが発生するはずです。 (Redis Insight でトークンを手動で削除する必要があることに注意してください。)

次に、投稿リクエストを /token に送信したときに生成されたトークンをコピーします。 エンドポイント (または新しいリクエストを作成) を選択し、それを /chat で必要なトークン クエリ パラメーターの値として貼り付けます。 ウェブソケット。次に接続します。接続が成功するはずです。

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド トークンを使用したチャット セッション

すべてをまとめると、chat.py は以下のようになります。


import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
 token = str(uuid.uuid4())
 if name == "":
 raise HTTPException(status_code=400, detail={
 "loc": "name", "msg": "Enter a valid name"})
 # Create nee chat session
 json_client = redis.create_rejson_connection()
 chat_session = Chat(
 token=token,
 messages=[],
 name=name
 )
 print(chat_session.dict())
 # Store chat session in redis JSON with the token as key
 json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
 # Set a timeout for redis data
 redis_client = await redis.create_connection()
 await redis_client.expire(str(token), 3600)
 return chat_session.dict()
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
 return None
# @route Websocket /chat
# @desc Socket for chat bot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 redis_client = await redis.create_connection()
 producer = Producer(redis_client)
 json_client = redis.create_rejson_connection()
 try:
 while True:
 data = await websocket.receive_text()
 stream_data = {}
 stream_data[token] = data
 await producer.add_to_stream(stream_data, "message_channel")
 await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

ここまでたどり着いて、よく頑張りました!次のセクションでは、AI モデルとの通信と、クライアント、サーバー、ワーカー、外部 API 間のデータ転送の処理に焦点を当てます。

AI モデルを使用してチャットボットにインテリジェンスを追加する方法

このセクションでは、トランスフォーマー モデルと通信し、ユーザーからのプロンプトを会話形式で API に送信し、チャット アプリケーションの応答を受信して変換するラッパーの構築に焦点を当てます。

Huggingface を始める方法

Hugginface では言語モデルを構築したり展開したりすることはありません。代わりに、Huggingface の高速推論 API を使用して事前トレーニングされたモデルに接続することに焦点を当てます。

使用するモデルは、EleutherAI が提供する GPT-J-6B モデルです。これは、60 億個のパラメーターを使用してトレーニングされた生成言語モデルです。

Huggingface は、このモデルに接続するためのオンデマンド限定 API をほぼ無料で提供します。

Huggingface を使い始めるには、無料のアカウントを作成してください。設定で、新しいアクセス トークンを生成します。最大 30,000 トークンの場合、Huggingface は推論 API へのアクセスを無料で提供します。

ここで API の使用状況を監視できます。このトークンは安全に保管し、公開しないようにしてください。

注:無料アカウントを使用しているため、API との通信には HTTP 接続を使用します。ただし、PRO Huggingface アカウントは、WebSocket によるストリーミングをサポートしており、並列処理とバッチ ジョブを参照できます。

これは、モデルとチャット アプリケーション間の応答時間を大幅に改善するのに役立ちます。この方法については、フォローアップ記事で取り上げたいと思います。

言語モデルを操作する方法

まず、Huggingface 接続認証情報をワーカー ディレクトリ内の .env ファイルに追加します。

export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B

次に、worker.srcmodel という名前のフォルダーを作成します 次に、ファイル gptj.py を追加します。 。次に、以下の GPT クラスを追加します。

import os
from dotenv import load_dotenv
import requests
import json
load_dotenv()
class GPT:
 def __init__(self):
 self.url = os.environ.get('MODEL_URL')
 self.headers = {
 "Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
 self.payload = {
 "inputs": "",
 "parameters": {
 "return_full_text": False,
 "use_cache": True,
 "max_new_tokens": 25
 }
 }
 def query(self, input: str) -> list:
 self.payload["inputs"] = input
 data = json.dumps(self.payload)
 response = requests.request(
 "POST", self.url, headers=self.headers, data=data)
 print(json.loads(response.content.decode("utf-8")))
 return json.loads(response.content.decode("utf-8"))
if __name__ == "__main__":
 GPT().query("Will artificial intelligence help humanity conquer the universe?")

GPT クラスは Huggingface モデル url で初期化されます 、認証 header 、および事前定義された payload 。ただし、ペイロード入力は、query によって提供される動的フィールドです。 メソッドは、Huggingface エンドポイントにリクエストを送信する前に更新されます。

最後に、GPT クラスのインスタンスでクエリ メソッドを直接実行して、これをテストします。ターミナルで、python src/model/gptj.py を実行します。 すると、次のような応答が得られるはずです (実際の応答はこれとは異なることに注意してください)。

[{'generated_text': ' (AI) could solve all the problems on this planet? I am of the opinion that in the short term artificial intelligence is much better than human beings, but in the long and distant future human beings will surpass artificial intelligence.\n\nIn the distant'}]

次に、入力に微調整を加えて、入力の形式を変更することでモデルとの対話をより会話的にします。

GPT を更新します。 次のようなクラス:


class GPT:
 def __init__(self):
 self.url = os.environ.get('MODEL_URL')
 self.headers = {
 "Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
 self.payload = {
 "inputs": "",
 "parameters": {
 "return_full_text": False,
 "use_cache": False,
 "max_new_tokens": 25
 }
 }
 def query(self, input: str) -> list:
 self.payload["inputs"] = f"Human: {input} Bot:"
 data = json.dumps(self.payload)
 response = requests.request(
 "POST", self.url, headers=self.headers, data=data)
 data = json.loads(response.content.decode("utf-8"))
 text = data[0]['generated_text']
 res = str(text.split("Human:")[0]).strip("\n").strip()
 return res
if __name__ == "__main__":
 GPT().query("Will artificial intelligence help humanity conquer the universe?")

入力を文字列リテラル f"Human: {input} Bot:" で更新しました。 。人間の入力は文字列に配置され、ボットが応答を提供します。この入力形式により、GPT-J6B は会話型モデルになります。その他の変更点には次のようなものがあります。

  • use_cache:入力が同じ場合にモデルで新しい応答を作成する場合は、これを False にできます。ユーザーがボットに同じメッセージをスパム送信し続ける場合に無料トークンが使い果たされるのを防ぐために、運用環境ではこれを True のままにすることをお勧めします。キャッシュを使用しても、実際にはモデルから新しい応答が読み込まれません。
  • return_full_text:入力を返す必要がないため、False になります。入力はすでに持っています。応答を取得すると、応答から「Bot:」と先頭/末尾のスペースを取り除き、応答テキストのみを返します。

AI モデルの短期記憶をシミュレートする方法

モデルに新しい入力を送信するたびに、モデルが会話履歴を記憶する方法はありません。これは、会話の文脈を保持したい場合に重要です。

ただし、モデルに送信するトークンの数が増えると、処理のコストが高くなり、応答時間も長くなることに注意してください。

したがって、短期履歴を取得してモデルに送信する方法を見つける必要があります。また、スイート スポットを把握する必要があります。どのくらいの履歴データを取得してモデルに送信するか?

チャット履歴を処理するには、JSON データベースにフォールバックする必要があります。 token を使用します。 最後のチャット データを取得し、応答を取得したら、その応答を JSON データベースに追加します。

worker.src.redis.config.py を更新します create_rejson_connection を含めるには 方法。また、.env ファイルを認証データで更新し、rejson がインストールされていることを確認します。

あなたの worker.src.redis.config.py 次のようになります:


import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
 def __init__(self):
 """initialize connection """
 self.REDIS_URL = os.environ['REDIS_URL']
 self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
 self.REDIS_USER = os.environ['REDIS_USER']
 self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
 self.REDIS_HOST = os.environ['REDIS_HOST']
 self.REDIS_PORT = os.environ['REDIS_PORT']
 async def create_connection(self):
 self.connection = aioredis.from_url(
 self.connection_url, db=0)
 return self.connection
 def create_rejson_connection(self):
 self.redisJson = Client(host=self.REDIS_HOST,
 port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
 return self.redisJson

.env ファイルは次のようになります。

export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B

次に、worker.src.rediscache.py という名前の新しいファイルを作成します そして、以下のコードを追加します。

from .config import Redis
from rejson import Path
class Cache:
 def __init__(self, json_client):
 self.json_client = json_client
 async def get_chat_history(self, token: str):
 data = self.json_client.jsonget(
 str(token), Path.rootPath())
 return data

キャッシュは rejson クライアントとメソッド get_chat_history で初期化されます。 トークンを受け取り、そのトークンのチャット履歴を Redis から取得します。 Rejson から Path オブジェクトをインポートしていることを確認してください。

次に、worker.main.py を更新します。 以下のコードを使用します:

from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
redis = Redis()
async def main():
 json_client = redis.create_rejson_connection()
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)
if __name__ == "__main__":
 asyncio.run(main())

Postman での以前のテストから作成されたサンプル トークンをハードコーディングしました。トークンを作成していない場合は、新しいリクエストを /token に送信してください。 トークンをコピーして、python main.py を実行します。 ターミナル内で。次のようにターミナルにデータが表示されるはずです。

{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}

次に、add_message_to_cache を追加する必要があります。 Cache へのメソッド 特定のトークンのメッセージを Redis に追加するクラス。


 async def add_message_to_cache(self, token: str, message_data: dict):
 self.json_client.jsonarrappend(
 str(token), Path('.messages'), message_data)

jsonarrappend rejson によって提供されるメソッドは、新しいメッセージをメッセージ配列に追加します。

メッセージ配列にアクセスするには、.messages を指定する必要があることに注意してください。 パスへの引数として。メッセージ データが異なる/ネストされた構造を持つ場合は、新しいデータを追加する配列へのパスを指定するだけです。

このメソッドをテストするには、main.py ファイル内の main 関数を以下のコードで更新します。

async def main():
 json_client = redis.create_rejson_connection()
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", message_data={
 "id": "1",
 "msg": "Hello",
 "timestamp": "2022-07-16 13:20:01.092109"
 })
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)

ハードコードされたメッセージをキャッシュに送信し、キャッシュからチャット履歴を取得しています。 python main.py を実行すると ワーカー ディレクトリ内のターミナルでは、次のような内容がターミナルに出力され、メッセージがメッセージ配列に追加されるはずです。

{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [{'id': '1', 'msg': 'Hello', 'timestamp': '2022-07-16 13:20:01.092109'}], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}

最後に、メッセージ データを GPT モデルに送信するメイン関数を更新し、 入力を最後の 4 つで更新する必要があります。 クライアントとモデルの間で送信されるメッセージ。

まず、add_message_to_cache を更新しましょう。 この関数には、メッセージが人間かボットかを示す新しい引数「source」が指定されています。次に、この引数を使用して、データをキャッシュに保存する前に「Human:」または「Bot:」タグをデータに追加できます。

add_message_to_cache を更新します。 Cache クラスのメソッドは次のようになります。

 async def add_message_to_cache(self, token: str, source: str, message_data: dict):
 if source == "human":
 message_data['msg'] = "Human: " + (message_data['msg'])
 elif source == "bot":
 message_data['msg'] = "Bot: " + (message_data['msg'])
 self.json_client.jsonarrappend(
 str(token), Path('.messages'), message_data)

Then update the main function in main.py in the worker directory, and run python main.py to see the new results in the Redis database.

async def main():
 json_client = redis.create_rejson_connection()
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
 "id": "1",
 "msg": "Hello",
 "timestamp": "2022-07-16 13:20:01.092109"
 })
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)

Next, we need to update the main function to add new messages to the cache, read the previous 4 messages from the cache, and then make an API call to the model using the query method. It'll have a payload consisting of a composite string of the last 4 messages.

You can always tune the number of messages in the history you want to extract, but I think 4 messages is a pretty good number for a demo.

In worker.src , create a new folder schema. Then create a new file named chat.py and paste our message schema in chat.py like so:

from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
 id = str(uuid.uuid4())
 msg: str
 timestamp = str(datetime.now())

Next, update the main.py file like below:

async def main():
 json_client = redis.create_rejson_connection()
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
 "id": "3",
 "msg": "I would like to go to the moon to, would you take me?",
 "timestamp": "2022-07-16 13:20:01.092109"
 })
 data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
 print(data)
 message_data = data['messages'][-4:]
 input = ["" + i['msg'] for i in message_data]
 input = " ".join(input)
 res = GPT().query(input=input)
 msg = Message(
 msg=res
 )
 print(msg)
 await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="bot", message_data=msg.dict())

In the code above, we add new message data to the cache. This message will ultimately come from the message queue. Next we get the chat history from the cache, which will now include the most recent data we added.

Note that we are using the same hard-coded token to add to the cache and get from the cache, temporarily just to test this out.

Next, we trim off the cache data and extract only the last 4 items. Then we consolidate the input data by extracting the msg in a list and join it to an empty string.

Finally, we create a new Message instance for the bot response and add the response to the cache specifying the source as "bot"

Next, run python main.py a couple of times, changing the human message and id as desired with each run. You should have a full conversation input and output with the model.

Open Redis Insight and you should have something similar to the below:

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド Conversational Chat

Stream Consumer and Real-time Data Pull from the Message Queue

Next, we want to create a consumer and update our worker.main.py to connect to the message queue. We want it to pull the token data in real-time, as we are currently hard-coding the tokens and message inputs.

In worker.src.redis create a new file named stream.py 。 Add a StreamConsumer class with the code below:

class StreamConsumer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def consume_stream(self, count: int, block: int, stream_channel):
 response = await self.redis_client.xread(
 streams={stream_channel: '0-0'}, count=count, block=block)
 return response
 async def delete_message(self, stream_channel, message_id):
 await self.redis_client.xdel(stream_channel, message_id)

The StreamConsumer class is initialized with a Redis client. The consume_stream method pulls a new message from the queue from the message channel, using the xread method provided by aioredis.

Next, update the worker.main.py file with a while loop to keep the connection to the message channel alive, like so:


from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
redis = Redis()
async def main():
 json_client = redis.create_rejson_connection()
 redis_client = await redis.create_connection()
 consumer = StreamConsumer(redis_client)
 cache = Cache(json_client)
 print("Stream consumer started")
 print("Stream waiting for new messages")
 while True:
 response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
 if response:
 for stream, messages in response:
 # Get message from stream, and extract token, message data and message id
 for message in messages:
 message_id = message[0]
 token = [k.decode('utf-8')
 for k, v in message[1].items()][0]
 message = [v.decode('utf-8')
 for k, v in message[1].items()][0]
 print(token)
 # Create a new message instance and add to cache, specifying the source as human
 msg = Message(msg=message)
 await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
 # Get chat history from cache
 data = await cache.get_chat_history(token=token)
 # Clean message input and send to query
 message_data = data['messages'][-4:]
 input = ["" + i['msg'] for i in message_data]
 input = " ".join(input)
 res = GPT().query(input=input)
 msg = Message(
 msg=res
 )
 print(msg)
 await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
 # Delete messaage from queue after it has been processed
 await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
 asyncio.run(main())

This is quite the update, so let's take it step by step:

We use a while True loop so that the worker can be online listening to messages from the queue.

Next, we await new messages from the message_channel by calling our consume_stream 方法。 If we have a message in the queue, we extract the message_id, token, and message. Then we create a new instance of the Message class, add the message to the cache, and then get the last 4 messages. We set it as input to the GPT model query 方法。

Once we get a response, we then add the response to the cache using the add_message_to_cache method, then delete the message from the queue.

How to Update the Chat Client with the AI Response

So far, we are sending a chat message from the client to the message_channel (which is received by the worker that queries the AI model) to get a response.

Next, we need to send this response to the client. As long as the socket connection is still open, the client should be able to receive the response.

If the connection is closed, the client can always get a response from the chat history using the refresh_token エンドポイント。

In worker.src.redis create a new file named producer.py , and add a Producer class similar to what we had on the chat web server:


class Producer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def add_to_stream(self, data: dict, stream_channel) -> bool:
 msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
 print(f"Message id {msg_id} added to {stream_channel} stream")
 return msg_id

Next, in the main.py file, update the main function to initialize the producer, create a stream data, and send the response to a response_channel using the add_to_stream method:

from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
from src.redis.producer import Producer
redis = Redis()
async def main():
 json_client = redis.create_rejson_connection()
 redis_client = await redis.create_connection()
 consumer = StreamConsumer(redis_client)
 cache = Cache(json_client)
 producer = Producer(redis_client)
 print("Stream consumer started")
 print("Stream waiting for new messages")
 while True:
 response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
 if response:
 for stream, messages in response:
 # Get message from stream, and extract token, message data and message id
 for message in messages:
 message_id = message[0]
 token = [k.decode('utf-8')
 for k, v in message[1].items()][0]
 message = [v.decode('utf-8')
 for k, v in message[1].items()][0]
 # Create a new message instance and add to cache, specifying the source as human
 msg = Message(msg=message)
 await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
 # Get chat history from cache
 data = await cache.get_chat_history(token=token)
 # Clean message input and send to query
 message_data = data['messages'][-4:]
 input = ["" + i['msg'] for i in message_data]
 input = " ".join(input)
 res = GPT().query(input=input)
 msg = Message(
 msg=res
 )
 stream_data = {}
 stream_data[str(token)] = str(msg.dict())
 await producer.add_to_stream(stream_data, "response_channel")
 await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
 # Delete messaage from queue after it has been processed
 await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
 asyncio.run(main())

Next, we need to let the client know when we receive responses from the worker in the /chat socket endpoint. We do this by listening to the response stream. We do not need to include a while loop here as the socket will be listening as long as the connection is open.

Note that we also need to check which client the response is for by adding logic to check if the token connected is equal to the token in the response. Then we delete the message in the response queue once it's been read.

In server.src.redis create a new file named stream.py and add our StreamConsumer class like this:

from .config import Redis
class StreamConsumer:
 def __init__(self, redis_client):
 self.redis_client = redis_client
 async def consume_stream(self, count: int, block: int, stream_channel):
 response = await self.redis_client.xread(
 streams={stream_channel: '0-0'}, count=count, block=block)
 return response
 async def delete_message(self, stream_channel, message_id):
 await self.redis_client.xdel(stream_channel, message_id)

Next, update the /chat socket endpoint like so:

from ..redis.stream import StreamConsumer
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
 await manager.connect(websocket)
 redis_client = await redis.create_connection()
 producer = Producer(redis_client)
 json_client = redis.create_rejson_connection()
 consumer = StreamConsumer(redis_client)
 try:
 while True:
 data = await websocket.receive_text()
 stream_data = {}
 stream_data[str(token)] = str(data)
 await producer.add_to_stream(stream_data, "message_channel")
 response = await consumer.consume_stream(stream_channel="response_channel", block=0)
 print(response)
 for stream, messages in response:
 for message in messages:
 response_token = [k.decode('utf-8')
 for k, v in message[1].items()][0]
 if token == response_token:
 response_message = [v.decode('utf-8')
 for k, v in message[1].items()][0]
 print(message[0].decode('utf-8'))
 print(token)
 print(response_token)
 await manager.send_personal_message(response_message, websocket)
 await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
 except WebSocketDisconnect:
 manager.disconnect(websocket)

Refresh Token

Finally, we need to update the /refresh_token endpoint to get the chat history from the Redis database using our Cache class.

In server.src.redis , add a cache.py file and add the code below:


from rejson import Path
class Cache:
 def __init__(self, json_client):
 self.json_client = json_client
 async def get_chat_history(self, token: str):
 data = self.json_client.jsonget(
 str(token), Path.rootPath())
 return data

Next, in server.src.routes.chat.py import the Cache class and update the /token endpoint to the below:


from ..redis.cache import Cache
@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
 json_client = redis.create_rejson_connection()
 cache = Cache(json_client)
 data = await cache.get_chat_history(token)
 if data == None:
 raise HTTPException(
 status_code=400, detail="Session expired or does not exist")
 else:
 return data

Now, when we send a GET request to the /refresh_token endpoint with any token, the endpoint will fetch the data from the Redis database.

If the token has not timed out, the data will be sent to the user. Or it'll send a 400 response if the token is not found.

How to Test the Chat with multiple Clients in Postman

Finally, we will test the chat system by creating multiple chat sessions in Postman, connecting multiple clients in Postman, and chatting with the bot on the clients.

Lastly, we will try to get the chat history for the clients and hopefully get a proper response.

AI チャットボットをマスターする:Redis と Python を使用して強力な GPT 駆動ボットを構築する – 包括的なガイド

Recap

Let's have a quick recap as to what we have achieved with our chat system. The chat client creates a token for each chat session with a client. This token is used to identify each client, and each message sent by clients connected to or web server is queued in a Redis channel (message_chanel), identified by the token.

Our worker environment reads from this channel. It does not have any clue who the client is (except that it's a unique token) and uses the message in the queue to send requests to the Huggingface inference API.

When it gets a response, the response is added to a response channel and the chat history is updated. The client listening to the response_channel immediately sends the response to the client once it receives a response with its token.

If the socket is still open, this response is sent. If the socket is closed, we are certain that the response is preserved because the response is added to the chat history. The client can get the history, even if a page refresh happens or in the event of a lost connection.

Congratulations on getting this far! You have been able to build a working chat system.

In follow-up articles, I will focus on building a chat user interface for the client, creating unit and functional tests, fine-tuning our worker environment for faster response time with WebSockets and asynchronous requests, and ultimately deploying the chat application on AWS.

This Article is part of a series on building full-stack intelligent chatbots with tools like Python, React, Huggingface, Redis, and so on. You can follow the full series on my blog:blog.stephensanwo.dev - AI ChatBot Series**

You can download the full repository on My Github Repository

I wrote this tutorial in collaboration with Redis. Need help getting started with Redis? Try the following resources:

  • Try Redis Cloud free of charge
  • Watch this video on the benefits of Redis Cloud over other Redis providers
  • Redis Developer Hub - tools, guides, and tutorials about Redis
  • RedisInsight Desktop GUI

無料でコーディングを学びましょう。 freeCodeCamp のオープンソース カリキュラムは、40,000 人以上の人々が開発者としての職に就くのに役立ちました。始めましょう


  1. redisデータストアでキーの名前を変更する方法– Redis RENAME | RENAMENX

    このチュートリアルでは、redis RENAMEおよびRENAMENXコマンドを使用して、redisデータストアのキーの名前を変更する方法について学習します。 RENAMEコマンド:- RENAMEコマンドで、キーの名前を古い名前から新しい名前に変更します。新しい名前のキーがすでに存在する場合は上書きされ、暗黙のDELコマンドを使用して古い名前のキーが削除されます。それ以外の場合は、古い名前のキーが新しい名前に変更されます。 redisRENAMEコマンドの構文は次のとおりです:- 構文:- redis host:post> RENAME <old name> &

  2. リストの最初の要素を削除して返す方法– Redis LPOP | BLPOP

    このチュートリアルでは、redisデータストアのキーに保存されているリスト値の最初の要素を削除して返す方法について学習します。このために、redis LPOPを使用します およびBLPOP コマンド。 LPOPコマンド このコマンドは、指定されたキーに格納されているリスト値の最初の要素を削除して返すために使用されます。キーが存在しない場合はNilが返され、キーは存在するがキーに格納されている値がリストデータ型ではない場合はエラーが返されます。 RedisLPOPコマンドの構文は次のとおりです:- 構文:- redis host:post> LPOP <keyname>