1. チュートリアル¶
この章では、、GridData Lakeの利用例の一つとして、 ローカルマシン上にあるログデータをGridData Lakeに送信し、またそのデータを集計しGridDBに格納する場合の ケースを例に紹介します。
なお、FluentdやHDFS、GridDBなど、各項目の利用方法に関する詳細は、 GridData Lake の該当する項目を参照してください。
1.1. Fluentdを使ってデータを収集する¶
本項ではFluentdのディストリビューションである、td-agentを元に説明します。
(1)プラグインのインストール
http出力用のプラグインfluent-plugin-out-httpを、ローカルマシンのtd-agentにインストールします
# td-agent-gem install fluent-plugin-out-http
(2)設定ファイルの編集
td-agentの設定ファイルtd-agent.confを編集し、データの収集および送信設定を行います。 ここではApacheのアクセスログを収集します。
<source>
@type tail
format apache2
refresh_interval 5s
pos_file /var/log/td-agent/tail.pos
tag apache.access
</source>
<filter apache.*>
@type record_transformer
<record>
tag ${tag}
</record>
</filter>
<match apache.*>
type http
endpoint_url http://XXXX:8080/fluentd/endpoint
flush_interval 10s
serializer json
http_method json
open_timeout 5
read_timeout 10
raise_on_error true
use_ssl fals
raise_on_http_failure true
authentication none
username ''
password ''
</match>
(3)データの収集及び送信
td-agentを起動することで、新たに発生したアクセスログに対しログの収集および送信が行われます。
# service td-agent start
1.2. HDFSのデータを集計し、GridDBに登録する¶
(1)GridData Lakeにて、Sparkの対話処理環境であるSpark-shellを起動します
$ $SPARK_HOME/sbin/spark-shell
(2)処理内容を記述し実行します。記述例は以下の通りです。
import com.toshiba.mwcloud.datalake._
# データの読み込みと集計
val DF = spark.read.json("/home/griddata/apache/access.json")
val countDF = DF.groupBy("host").count("count").sort()
# GridDBへ書き込み
countDF.write.griddb("accesshost_count")