1. チュートリアル¶
本章では、GridData Lakeの利用例の一つとして、ローカルマシン上にあるログデータをGridData Lakeに送信する手順を説明します。 また、受信したデータを集計し、その結果をGridDBに格納する手順を説明します。 なお、各手順の詳細は、 その他の機能の利用方法 の該当する項目を参照してください。
1.1. Fluentdを使ってデータを収集する¶
Fluentdのディストリビューションの一つである、td-agentがクライアントマシンにインストールされているとします。
(1)HTTP出力用のプラグインfluent-plugin-out-httpを、クライアントマシンのtd-agentに追加インストールします。
$ td-agent-gem install fluent-plugin-out-http
(2)td-agentの設定ファイルtd-agent.confを編集し、データの収集・送信に関する設定を行います。 ここではApacheのアクセスログを収集します。
<source>
@type tail
format apache2
refresh_interval 5s
pos_file /var/log/td-agent/tail.pos
tag apache.access
</source>
<filter apache.*>
@type record_transformer
<record>
tag ${tag}
</record>
</filter>
<match apache.*>
type http
endpoint_url http://XXXX:8080/fluentd/endpoint
flush_interval 10s
serializer json
http_method json
open_timeout 5
read_timeout 10
raise_on_error true
use_ssl fals
raise_on_http_failure true
authentication none
username ''
password ''
</match>
(3)td-agentを起動すると、新たに生成・追記されたアクセスログを自動的に収集し、送信します。
$ service td-agent start
1.2. HDFSのデータを集計し、GridDBに登録する¶
(1)Sparkの対話型評価環境であるSpark-shellを起動します。
$ $SPARK_HOME/sbin/spark-shell
(2)td-agentから受信したデータがaccess.jsonとしてHDFSに格納されています。このファイルを読み込み、ホストごとのアクセス回数を集計します。 集計結果をGridDBに格納します。
処理内容をScalaコードで記述し、Spark上で実行します。
# データの読み込みと集計
val DF = spark.read.json("/home/griddata/apache/access.json")
val countDF = DF.groupBy("host").count("count").sort()
# GridDBへ書き込み
countDF.write.format("com.toshiba.mwcloud.datalake.datasource").save("table_name")