データベース
 Computer >> コンピューター >  >> プログラミング >> データベース

Logstashを使用してCSVデータをElasticsearchにロードする

元々は2015年9月10日にObjectRocket.com/blogで公開されました

まったく新しいElasticsearch®インスタンスがありますが、検索したいすべての有用なデータは CSVにあります ファイル?問題ありません。 Logstash®を使用すると、ほとんどすべてのデータをElasticsearchインデックスで簡単に検索できるものに変換できます。

Logstashを使用してCSVデータをElasticsearchにロードする

まず、これらの例を使用するには、Unix®に似たデータと環境が必要です。 Windows®は、若干の調整を加えるだけで正常に動作します。この場合、DavisVantagePro2®からデータをエクスポートする必要がありました。 ウェザーステーション、.CSV フォーマットし、それを使用して新しいインデックスを作成します。

ローカルファイルに保存された、これらに類似した数百万行から始めました:

$ head -3 /home/erik/weather.csv
HumOut,TempIn,DewPoint,HumIn,WindDir,RainMonth,WindSpeed,RainDay,BatteryVolts,WindChill,Pressure,time,TempOut,WindSpeed10Min,RainRate
76,78.0,78.227017302825,44,109,2.0,2,0.0,1.236328125,90.87261657090625,29.543,2015-06-18T17:49:29Z,86.5,1,0.0
76,78.0,78.227017302825,44,107,2.0,2,0.0,1.236328125,90.87261657090625,29.543,2015-06-18T17:49:45Z,86.5,1,0.0
76,78.0,78.32406784157725,44,107,2.0,0,0.0,1.236328125,90.83340000000001,29.543,2015-06-18T17:50:00Z,86.59999999999999,1,0.0

注: この実験を機能させるには、少なくとも1つのデータソースが必要です。

データを取得したら、開始できます。まず、Javaのバージョンがインストールされていることを確認します。

$ java -version
openjdk version "1.8.0_51"

Java仮想マシン (JVM)はこれに適しています—OpenJDK®、Oracle®など。

$ curl -O https://download.elastic.co/logstash/logstash/logstash-1.5.4.tar.gz
$ tar xfz logstash-1.5.4.tar.gz
$ cd logstash-1.5.4
$ mkdir conf

次に、構成ファイルを作成します。

まず、inputを定義します Logstashを伝えるセクション データの場所:

input {
    file {
        path => "/home/erik/weather.csv"
        start_position => beginning

    }
}

これは、 Logstashに通知するだけです どこを見て、ファイルの最初からロードしたいのか。次に、フィルターが必要です。Logstashには、デフォルトで使用可能なフィルタープラグインが多数あります。この例では、カップルを使用してデータを解析します。これまでのところ、Logstashはファイル内のデータについて何も知りません。さまざまなフィールドの処理方法について、形式やその他の詳細を指定する必要があります。

filter {
    csv {
        columns => [
          "HumOut",
          "TempIn",
          "DewPoint",
          "HumIn",
          "WindDir",
          "RainMonth",
          "WindSpeed",
          "RainDay",
          "BatteryVolts",
          "WindChill",
          "Pressure",
          "time",
          "TempOut",
          "WindSpeed10Min",
          "RainRate"
        ]
        separator => ","
        remove_field => ["message"]
        }
    date {
        match => ["time", "ISO8601"]
    }
    mutate {
        convert => ["TempOut", "float"]
    }
}

列は一目瞭然ですが、ここで詳しく説明します。まず、この例ではmessageを削除します フィールド。行全体を含むエントリです。特定の属性を検索しているので、それは必要ありません。次に、timeを指定します フィールドにISO8601-formatted dateが含まれています Elasticsearch 普通の文字列ではないことを知っています。最後に、mutate関数を使用してTempOutを変換します 値を浮動小数点数に変換します。

次に、次のコードを使用してデータを取り込み、Elasticsearchに保存した後に解析します。

output {
    elasticsearch {
        protocol => "https"
        host => ["iad1-20999-0.es.objectrocket.com:20999"]
        user => "erik"
        password => "mysupersecretpassword"
        action => "index"
        index => "eriks_weather_index"
    }
    stdout { }
}

最後に、ホストとポート、認証データ、およびそれを格納するインデックスの名前を構成します。

さて、それを起動しましょう。動作している場合は、次のようになります。

$ bin/logstash -f conf/logstash.conf -v
Logstash startup completed

うまくいきましたか? Elasticsearchに質問する:

$ curl -u erik:mysupersecretpassword 'https://iad1-20999-0.es.objectrocket.com:20999/_cat/indices?v'
health status index               pri rep docs.count store.size pri.store.size
green  open   eriks_weather_index 5   1   294854     95.8mb     48.5mb

ドキュメントがそこにあるので、1つクエリします:

$ curl -u erik:mysupersecretpassword 'https://iad1-20999-0.es.objectrocket.com:20999/eriks_weather_index/_search?q=TempOut:>75&pretty&terminate_after=1'

これにより、ElasticsearchはTempOutのドキュメントを検索するようになります。 75より大きい(Tempout:>75 )、人間が消費できるようにフォーマットし(かなり)、シャードごとに1つだけの結果を返す(terminate_after=1 )。次のようなものが返されるはずです:

{
  "took" : 4,
  "timed_out" : false,
  "terminated_early" : true,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
     "total" : 5,
     "max_score" : 1.0,
       "hits" : [ {
    "_index" : "eriks_weather_index",
      "_type" : "logs",
      "_id" : "AU-yXZJIJb3HnhKvpdNC",
      "_score" : 1.0,
      "_source":{"@version":"1","@timestamp":"2015-06-22T10:24:23.000Z","host":"kibana","path":"/home/erik/weather.csv","HumOut":"86","TempIn":"79.7","DewPoint":"70.65179649787358","HumIn":"46","WindDir":"161","RainMonth":"2.7","WindSpeed":"0","RainDay":"0.36","BatteryVolts":"1.125","WindChill":"82.41464999999999","Pressure":"29.611","time":"2015-06-22T10:24:23Z","TempOut":75.1,"WindSpeed10Min":"0","RainRate":"0.0"}
    } ]
   } 
}

成功。 Logstashは、配置しているデータをElasticsearch内で簡単に再生できるものに変換するための優れたスイスアーミーナイフです。楽しんでください!

www.rackspace.comにアクセスし、セールスチャットをクリックします シンバーセーションを開始します。 フィードバックを使用する タブをクリックしてコメントしたり質問したりできます。

Rackspace Cloudの利用規約を表示するには、ここをクリックしてください。


  1. Elasticsearchのユースケーストップ5

    元々は2017年5月16日にObjectRocket.com/blogで公開されました。 「あなたが知っている、検索のための」以外に、Elasticsearch®の使用は時間とともに成長し変化し続けています。 ObjectRocketでは、しばらくの間、プラットフォーム上でホストされたElasticsearchを提供してきました。お客様の間で明確な傾向が見られ、お客様が製品をどのように使用しているかがわかります。この投稿では、ObjectRocketプラットフォームで見られるElasticSearchの上位5つのユースケースを共有します。 #1 –ログとログ分析 Elasticsear

  2. FilebeatおよびElasticsearchIngestPipelinesを使用したcsvファイルの解析

    Elasticsearch 5の最も優れた新機能の1つは、ElasticsearchクラスターにLogstashスタイルの処理を追加する取り込みノードです。これにより、別のサービスやインフラストラクチャを必要とせずに、インデックスを作成する前にデータを変換できます。しばらく前に、Logstashを使用してcsvファイルを解析する方法について簡単なブログを投稿したので、比較のために、その取り込みパイプラインバージョンを提供したいと思います。 ここで紹介するのは、Filebeatを使用してデータを取り込みパイプラインに送信し、インデックスを作成し、Kibanaで視覚化する例です。 データ 無料