1. チュートリアル

本章では、GridData Lakeの利用例の一つとして、ローカルマシン上にあるログデータをGridData Lakeに送信する手順を説明します。 また、受信したデータを集計し、その結果をGridDBに格納する手順を説明します。 なお、各手順の詳細は、 その他の機能の利用方法 の該当する項目を参照してください。

1.1. Fluentdを使ってデータを収集する

Fluentdのディストリビューションの一つである、td-agentがクライアントマシンにインストールされているとします。

(1)HTTP出力用のプラグインfluent-plugin-out-httpを、クライアントマシンのtd-agentに追加インストールします。

$ td-agent-gem install fluent-plugin-out-http

(2)td-agentの設定ファイルtd-agent.confを編集し、データの収集・送信に関する設定を行います。 ここではApacheのアクセスログを収集します。

<source>
 @type tail
  format apache2
  refresh_interval 5s
  pos_file /var/log/td-agent/tail.pos
  tag apache.access
</source>
<filter apache.*>
  @type record_transformer
  <record>
    tag ${tag}
  </record>
</filter>
<match apache.*>
  type http
  endpoint_url          http://XXXX:8080/fluentd/endpoint
  flush_interval 10s
  serializer            json
  http_method            json
  open_timeout          5
  read_timeout          10
  raise_on_error        true
  use_ssl               fals
  raise_on_http_failure true
  authentication        none
  username     ''
  password     ''
</match>

(3)td-agentを起動すると、新たに生成・追記されたアクセスログを自動的に収集し、送信します。

$ service td-agent start

1.2. HDFSのデータを集計し、GridDBに登録する

(1)Sparkの対話型評価環境であるSpark-shellを起動します。

$ $SPARK_HOME/sbin/spark-shell

(2)td-agentから受信したデータがaccess.jsonとしてHDFSに格納されています。このファイルを読み込み、ホストごとのアクセス回数を集計します。 集計結果をGridDBに格納します。

処理内容をScalaコードで記述し、Spark上で実行します。

# データの読み込みと集計
val DF = spark.read.json("/home/griddata/apache/access.json")
val countDF = DF.groupBy("host").count("count").sort()

# GridDBへ書き込み
countDF.write.format("com.toshiba.mwcloud.datalake.datasource").save("table_name")