2. GridData Lake

2.1. Fluentd

Fluentdはデータの収集・集約・転送などデータ処理を行うためのOSSです。

本節では、Fluentdを利用し収集したデータを、GridData Lakeに送信する方法を説明します。

2.1.1. 事前準備

あらかじめ、送信側のマシンにFluentdとhttp送信が可能なFluentdプラグインをインストールしてください。

GridData Lakeではfluentd-plugin-out-http (https://github.com/ento/fluent-plugin-out-http) V0.2.0に対し動作確認をしています。 fluent-plugin-out-http以外のプラグインを使用する場合、設定項目は本項目を参考にしてください。

2.1.2. 設定ファイルの記述

Fluentdは、入出力を含む一連のデータの処理内容を、ディレクティブという単位で設定ファイルに記述します。 主なディレクティブと設定内容は以下の通りです。

■ sourceディレクティブ

データの収集対象に関する設定を行います。このとき、データソースごとにタグと呼ばれる識別子を付与します。 記述方法や設定内容は、データ入力用の各プラグインに準拠します。 詳細は Fluentdの入力プラグインのページ を参照してください。

■ filterディレクティブ

入力されたデータに対し、必要に応じて値の追加や修正を行います。 なお、Fluentdが収集したデータはjson形式でGridData Lakeに送信されますが、送信されるデータにはtagという名前のキーを含む必要があります。 そこで、Fluentdにバンドルされたfilter用プラグインであるrecord_transformerを使用し、 以下のように「キー名をtag、値をfluentdで使われるデータのタグ名」としたデータを含むようにfilterディレクティブで設定を行います。

<filter **>
  @type record_transformer
  <record>
    tag ${tag}
  </record>
</filter>

■ matchディレクティブ

指定したタグに対し出力設定を行います。GridData Lakeでは、以下の設定パラメータは特定の値を設定する必要があります。

項目名 設定値 備考
endpoint_url http://<GridData LakeのサーバのIP>:8080/fluentd/endpoint 送信先URL
http_method post HTTPメソッド
serializer json 読み込んだデータをサーバに送信するときのデータ形式
use_ssl false  
authentication none  

以下に、fluent-plugin-out-httpを使用してGridData Lakeにデータを送信する場合の設定ファイルの例を示します。

<match apache.*>
  type http
  endpoint_url          http://XX.XX.XX.XX:8080/fluentd/endpoint
  http_method           post
  serializer            json
  use_ssl               false
  authentication        none
  open_timeout          5
  read_timeout          10
  raise_on_error        true
  raise_on_http_failure true
  flush_interval 10s
</match>

2.1.3. データの収集と送信

設定ファイルを記述した後にFluentdを起動することで、データの収集と送信が行われます。

GridData Lakeはデータを受信すると、HDFS上にファイルとして保存します。 このとき、ファイルは/home/griddata/<Fluentdで使用したタグ名.json>という名前で保存されます。タグ名にドットを含む場合、 ドットをパスの区切り文字とし、そのディレクトリに保存します。例えば、http.accessというタグを使用した場合、 /home/griddata/http/access.jsonというファイルが作成されます。 またこのとき、受信したデータは各データをjson形式とし、一行ごとにデータが格納されます。

json形式の仕様については、データ入力用の各プラグインを参照してください。

2.2. Embulk

EmbulkはFluentdよりも大規模なデータのパージングや転送を行うためのOSSです。

本節では、Emlubkを利用し収集したデータを、GridData Lakeに送信する方法を説明します。

2.2.1. 事前準備

送信側のマシンにはあらかじめ、EmbulkおよびFTPによる送信が可能なEmbulkのプラグインをインストールしてください。 ここでは、プラグインの例としてembulk-output-ftpを元に説明を行います。 その他のプラグインを使用する場合、設定内容は本設定を参考にしてください。

2.2.2. 設定ファイルの記述

Embulkは設定ファイルをyml形式で記述します。 このとき、主な設定内容は以下の通りです。

■ exec

Embulkの動作に関する設定を指定します。

■ in

入力するデータのフォーマットを指定します。 記述方法や設定内容は、データ入力用の各プラグインに準拠します。 詳細は Embulkの設定方法のページ を参照してください。

■ out

入力したデータの出力設定を行います。 以下に、embulk-output-ftpを使用しGridData Lakeにデータを送信する場合の設定例を紹介します。

out:
  type: ftp
  host: XX.XX.XX.XX
  port: 40021
  user: anonymous
  password: anonymous
  path_prefix: output
  file_ext: csv
  passive_mode: true
  ssl: false
  ssl_verify: false
  formatter:
    type: csv
    header_line: false
    charset: UTF-8
    newline: CRLF

このとき、必要な設定項目は以下の通りです。

項目名 設定値 備考
host XX.XX.XX.XX GridData LakeのサーバのIP
user anonymous 接続するユーザ名
password anonymous 接続ユーザのパスワード。接続ユーザがanonymousの場合は不要です
port 50020 FTP接続を行う際のポート番号
passive_mode true FTPの接続モード
ssl false SSL設定

2.2.3. データの収集と送信

作成した設定ファイルを引数にembulkを起動することで、データの送信が行われます。

$ embulk run config.yml

このとき設定ファイルの設定に基づき、embulkは読み込んだデータを一度パージングしGridDataLakeに送信します。 送信されたデータはGridData LakeのHDFSに格納されます。 このとき、embulk-output-ftpを使用した場合、 ファイルは設定ファイルのpath_prefixおよびfile_extで設定した名前で保存されます。 一例としてpath_prefixにoutput、file_extにcsvと指定した場合は、output.csvというファイルが保存されます。

またこのとき、embulkの設定に基づき、ファイル名にoutput01.000.csvというように連番が付与されることがあります。 連番を付けない場合や連番の形式を変更する場合、設定ファイルでsequence_formatやmin_output_tasksの値を修正してください。 詳しくはembulkのビルドインプラグインのページ(http://www.embulk.org/docs/built-in.html#file-output-plugin) を参照してください。

2.3. HDFS

HDFSは大規模データ処理用のファイルシステムです。 HDFSでファイルを管理するには、hdfs dfsコマンドを使用します。 hdfs dfs -に続きサブコマンドを指定しますが、 サブコマンドにはls、cat、mv、rm、chmodなどいくつかの主要なunixコマンドと同名のコマンドが利用可能です。

(ファイルの一覧を表示)
$ hdfs dfs -ls
Found 1 items
drwxr-xr-x   - griddata supergroup          0 2017-09-22 20:43 test
(ディレクトリの作成)
$ hdfs dfs -mkdir testdir
$ hdfs dfs -ls
Found 2 items
drwxr-xr-x   - griddata supergroup          0 2017-09-22 20:43 test
drwxr-xr-x   - griddata supergroup          0 2017-09-27 11:03 testdir

また、サブコマンドにはput、get、appendToFile、moveFromLocalなどhdfs独自のコマンドも利用可能です。 ローカルのファイルシステムからHDFSにファイルをコピーするにはputを、HDFSからローカルにファイルをコピーするにはgetを使用します。

■ putコマンドの使用例

$ ls
localfile.txt
$ hdfs dfs -put ./localfile.txt
$ hdfs dfs -ls
Found 3 items
-rw-r--r--   2 griddata supergroup          4 2017-09-27 11:11 localfile.txt
drwxr-xr-x   - griddata supergroup          0 2017-09-22 20:43 test
drwxr-xr-x   - griddata supergroup          0 2017-09-27 11:03 testdir

■ getコマンドの使用例

$ ls
$ hdfs dfs -ls
Found 1 items
-rw-r--r--   2 griddata supergroup          4 2017-09-27 11:12 hdfsfile.txt
$ hdfs dfs -get hdfsfile.txt
$ ls
hdfsfile.txt

その他の利用可能なコマンドについては、 Hadoopのページ を参照してください。

2.4. GridDB

GridData Lakeでは、GridDBの以下の追加機能を利用可能です。

  • GridDB Java API
  • GridDB DataSource API

詳細はGridDBのAPIリファレンス、および データ編集 の各該当項目を参照してください。

2.5. データ編集

2.5.1. データ編集アプリケーションを実行する

以下の4種類の方法で、データ編集アプリケーションを実行することができます。

Scalaで記述したアプリケーションを実行する

spark-submit [--deploy-mode cluster|client] --class (アプリケーションの実行クラスの完全修飾クラス名) (アプリケーションのjarの絶対パス) [アプリケーションの引数]

Pythonで記述したアプリケーションを実行する

spark-submit (アプリケーションのPythonファイルの絶対パス) [アプリケーションの引数]

対話型インタプリタ上でScalaコードを実行する

spark-shell

対話型インタプリタ上でpythonコードを実行する

pyspark

注釈

Sparkのアプリケーション実行方式には、clusterモードとclientモードがあります。

spark-submitでscalaアプリケーションを実行する場合のみ、clusterモードとclientモードのどちらも選択することができます。 他はすべてclientモードのみ選択することができます。

2.5.2. Java APIを利用する

GridData Lakeには、HDFSとGridDBのJavaライブラリが含まれています。 これらのライブラリは環境変数CLASSPATHに登録されていますので、以下のようにすることで、JavaアプリケーションからこれらのAPIを呼び出すことができます。

java -cp $CLASSPATH (アプリケーションの実行クラスの完全修飾クラス名) (アプリケーションのjarの絶対パス) [アプリケーションの引数]

2.5.3. GridDB DataSource APIを利用する

GridDB DataSource APIは、GridDBのコンテナとSparkのDataFrameを相互に変換するAPIです。

# APIライブラリの読み込み
import com.toshiba.mwcloud.datalake._

# コンテナを読み込みDataFrameに変換
val df = session.read.griddb("table_name")

# DataFrameをコンテナに格納
df.write.griddb("write_table_name")