1. チュートリアル

この章では、、GridData Lakeの利用例の一つとして、 ローカルマシン上にあるログデータをGridData Lakeに送信し、またそのデータを集計しGridDBに格納する場合の ケースを例に紹介します。

なお、FluentdやHDFS、GridDBなど、各項目の利用方法に関する詳細は、 GridData Lake の該当する項目を参照してください。

1.1. Fluentdを使ってデータを収集する

本項ではFluentdのディストリビューションである、td-agentを元に説明します。

(1)プラグインのインストール

http出力用のプラグインfluent-plugin-out-httpを、ローカルマシンのtd-agentにインストールします

# td-agent-gem install fluent-plugin-out-http

(2)設定ファイルの編集

td-agentの設定ファイルtd-agent.confを編集し、データの収集および送信設定を行います。 ここではApacheのアクセスログを収集します。

<source>
 @type tail
  format apache2
  refresh_interval 5s
  pos_file /var/log/td-agent/tail.pos
  tag apache.access
</source>
<filter apache.*>
  @type record_transformer
  <record>
    tag ${tag}
  </record>
</filter>
<match apache.*>
  type http
  endpoint_url          http://XXXX:8080/fluentd/endpoint
  flush_interval 10s
  serializer            json
  http_method            json
  open_timeout          5
  read_timeout          10
  raise_on_error        true
  use_ssl               fals
  raise_on_http_failure true
  authentication        none
  username     ''
  password     ''
</match>

(3)データの収集及び送信

td-agentを起動することで、新たに発生したアクセスログに対しログの収集および送信が行われます。

# service td-agent start

1.2. HDFSのデータを集計し、GridDBに登録する

(1)GridData Lakeにて、Sparkの対話処理環境であるSpark-shellを起動します

$ $SPARK_HOME/sbin/spark-shell

(2)処理内容を記述し実行します。記述例は以下の通りです。

import com.toshiba.mwcloud.datalake._

# データの読み込みと集計
val DF = spark.read.json("/home/griddata/apache/access.json")
val countDF = DF.groupBy("host").count("count").sort()

# GridDBへ書き込み
countDF.write.griddb("accesshost_count")