Redis
 Computer >> コンピューター >  >> プログラミング >> Redis

ストリームを使用したRedisの時系列

Redisは、「データ構造サーバー」と見なされることが多く、いくつかの単純なデータ構造プリミティブにネットワークインターフェイスを提供します。ストリームは、Redisが何年も前にソートされたセットを導入して以来、最初の主要な新しい汎用データ構造です。この新しい構造の主な用途の1つである時系列データのモデリングを見てみましょう。

ストリーム:新しいRedisデータ構造

Redis Streamsは、キーと値のペアの追加のみの時系列を表します。

任意の数のクライアントがストリームに書き込むことができ、書き込むたびに、時系列に挿入されたアイテムの一意の昇順IDを受け取ります。

データを読み取るクライアントは、新しいデータの受信中にブロックしたり、バッチ処理のために最後に読み取ったメッセージの「ブックマーク」を維持したり、ワークロードを共有してメッセージを確認したりするためのより複雑な「コンシューマーグループ」に編成したりできます。

ストリームは大きなトピックであるため、この投稿では、ストリームを使用してRedisで時系列データをモデル化する方法について簡単に説明します。新しいデータ構造により、これは、ListまたはSortedSetタイプを使用して時系列をモデル化するなど、以前のアプローチよりも劇的に単純になります。

ここでのコード例では、node.js用のioredisクライアントを使用します:

var Redis = require('ioredis');
var redis = new Redis();
ストリームへのデータの送信

ストリームは追加専用であるため、通常、必要な詳細は、書き込みたいRedisキーの名前と、キーと値のペアのセットだけです。

空気質センサーからの測定値を記録し、キーと値のペアのストリームをsite:pdxに送信します。 キー-具体的には、currentAir Quality Indexと摂氏での気温を送信します:

redis.xadd('site:pdx', '*',
           'aqi', 37,
           'tempc', 5.1).then(function(id) {
  console.log("id:", id);
});

> 1527974818120-0

XADDを送信します 記録する新しい測定値があるときはいつでもコマンド。応答は、クエリで使用できる一意の常に昇順のIDです。IDの最初の部分、1527974818120 Redisサーバーによって割り当てられたタイムスタンプです。 IDの2番目の部分は、複数のクライアントが同時に書き込む可能性がある場合の衝突を回避するための増分番号です。

*を提供する 上記のように、書き込みコマンドの2番目の引数として、Redisに独自のタイムスタンプを使用してデータを保存することを通知します

データを書き込むときにタイムスタンプを指定することもできますが、通常は推奨されません。Redisにタイムスタンプを選択させると、クライアントがIDの選択と順序付けについて調整する必要なしに、多くのクライアントが単一のストリームに同時にデータを書き込むことができます。Redisサーバーはこれらの詳細を処理するだけです。クライアント。

簡単な読み方

いくつかの値がストリームに含まれると、IDまたはタイムスタンプの範囲を指定することにより、値の範囲を取得できます。これは、私の空気質指標の最新の測定値をグラフに表示するアプリで使用できます:

redis.xrange('site:pdx',
             '1527974818120-0',
             '+',
             'COUNT', 5).then(function(resp) {

  // resp now holds 5 readings, pass them to the open graph:
  // console.log(resp);
});

> [ [ '1543947167906-0', [ 'aqi', '31', 'tempc', '5.1' ] ],
> [ '1543947168312-0', [ 'aqi', '31', 'tempc', '5.3' ] ],
> [ '1543947168901-0', [ 'aqi', '31', 'tempc', '5.4' ] ],
> [ '1543947170033-0', [ 'aqi', '31', 'tempc', '5.4' ] ],
> [ '1543947171460-0', [ 'aqi', '31', 'tempc', '5.6' ] ] ]

ストリーム内のどこからでもさまざまな数値をサンプリングできるため、パフォーマンスを低下させることなく、グラフ作成システムで履歴データをクエリできます。

ブロックとポーリング

データの範囲のクエリはグラフや履歴の監視に役立ちますが、データの受信時にすぐに応答できるシステムを構築したい場合があります。XREADを使用すると、Redisストリームもこれに最適です。 :

redis.xread('BLOCK', 10000,
            'STREAMS', 'site:pdx', '$').then(function(resp) {

  // close the windows if aqi > 50
  console.log(resp);
});

このようなブロック操作は、データが到着するまで、またはタイムアウト(ここでは1000ms)まで待機するため、データが利用可能になるまで上記の例のようにブロックし続けるポーリングを維持し、同じXREADを呼び出すだけです。 データを受信した後、またはコマンドがタイムアウトした後は、毎回コマンドを再実行してください。

ストリームへの接続間でデータが失われないようにするために、ストリームから読み取るときにストリームから読み取った最後のIDを指定できます。これにより、中断したところから正確に再開できます。

上記の場合、streamsoに入力されたデータはすべて、「新しいデータのみ」を示すために特別なトークン「$」を使用したかっただけです。コマンドは、指定されたIDとわずかに異なるように見えます:

redis.xread('BLOCK', 10000,
            'STREAMS', 'site:pdx', '1543947171460-0');

また、一度に多くの異なるストリームを読み取って、データを受け入れる最初のストリームから値を返すこともできます。

redis.xread('BLOCK', 10000,
            'STREAMS',
              'site:pdx', 'site:global',
              '1543947171460-0', '$');

この例では、ID 1543947171460-0より新しいデータがある場合にコマンドが返されます。 site:pdxに書かれています 、または any 新しいデータはsite:globalに書き込まれます 。

消費者の調整

ストリームに大量のデータが流れるようになったら、インバウンドメッセージを処理するコンシューマーの複数のコピーが必要になる場合があります。これらの消費者は、メッセージを受け取り、それに対してアクションを実行してから、作業が完了したことを「確認」する必要があります。 Redisストリームは、これらの操作のプリミティブも提供します。ここに入るには多すぎますが、詳細についてはドキュメントを参照してください。

ストリームコマンドのクイックリファレンス

ストリームで利用できる新しいRedisコマンドがたくさんあります!こちらがクイックリファレンスです。詳細については、公式ドキュメントをご覧ください。

簡単なコマンド:

  • XADD:アイテム(キーと値のペアのバンドル)をストリームに追加します
  • XRANGEおよびXREVRANGE:範囲を選択するか、ストリームアイテムを反復処理します
  • XREAD:一部のIDよりも新しいアイテムをフェッチします(オプションでブロックします)
  • XTRIM:古いアイテムを破棄してストリームをトリミングします
  • XDEL:ストリームから特定のアイテムを削除します
  • XLEN:ストリーム内のアイテムをカウントします
  • XINFO:ストリームメタデータを検査する

消費者グループのコマンド:

  • XGROUP:コンシューマーグループを作成、削除、またはリセットし、そのメンバーを削除します
  • XREADGROUP:XREAD(上記)と同様ですが、コンシューマーグループを使用してメッセージを受信します
  • 支出​​:消費者グループに配信されたが確認されていないメッセージを検査します。
  • XACK:コンシューマーグループへのメッセージを確認し、保留リストから削除します
  • XCLAIM:死んだ消費者からのメッセージを引き継ぐ
ユースケース

Streamsの最も一般的に引用されるユースケースは、センサーがデータをストリームに配置して、消費者がさまざまな方法(分析、アーカイブからコールドストレージ、グラフに表示)で使用できるようにするIoTワークロードです。ここでは、上限サイズのストリームが優れたユースケースであり、予測可能なメモリフットプリントで固定サイズのストリームを割り当てることができます。

ストリームは、以前にotherRedisデータ構造を使用していたアプリケーションでの使用にも適しています。 CeleryやSidekiqなどのキューイングアプリケーションは、Streamsのコンシューマーグループを利用して、Redisネイティブな方法で読み取りレシートの検査を提供できます。 pubsubはクライアントに公開された後もメッセージを保持しないため、RedisStreamsでより堅牢にすることができるRedispubsubを使用した単純なチャットアプリを示すブログ投稿がたくさんあります。

試してみる準備はできましたか?

数回クリックするだけで、新しいRedisGreenサーバーでRedisストリームを試すことができます。


  1. CloudflareワーカーとのRedis@Edge

    エッジでのコンピューティングは、近年最もエキサイティングな機能の1つです。 CDNを使用すると、ファイルをユーザーに近づけることができます。エッジコンピューティングを使用すると、アプリケーションをユーザーの近くで実行できます。これは、開発者がグローバルに分散されたパフォーマンスの高いアプリケーションを構築するのに役立ちます。 Cloudflare Workersは、現在この分野の主要製品です。コールドスタートのないサーバーレス処理環境を提供します。 Cloudflareのグローバルネットワークを活用して、アプリケーションのレイテンシーを最小限に抑えます。関数はJavascript、Rust、

  2. REDIS(REmote DIrectory Server)–Redisチュートリアル

    Redisは、オープンソース(BSDライセンス)のNoSQLデータベースです。これはインメモリデータベースです 、Key-Valueストアの概念に基づく 。 Redisはデータ構造ストアとも呼ばれます 。 すべての用語を1つずつ理解しましょう:- Key-Valueデータストア:- これはデータストレージパラダイムであり、データベースに保存されている値を一意に識別するキーに対する値としてデータが保存および取得されます。 Redisは巨大なハッシュテーブルと見なすことができます。 インメモリデータベース:- インメモリデータベースは、すべてのデータをメインメモリ(RAM)に格納するデー