3. その他の機能の利用方法¶
3.1. Fluentdサーバー¶
ログデータ収集ソフトウェアであるFluentdからのデータを受信するサーバーにより、HDFSにデータを蓄積します。 FluentdサーバーはHTTPで送信されたJSONデータを受信し、HDFS上のファイルにJSON文字列を追記します。
以下ではFluentdサーバーに対してFluentdからデータを送信し、GridData LakeのHDFS上にデータを蓄積する方法を説明します。
3.1.1. 動作環境¶
GridData Lakeでは、以下のFluentdとそのプラグインからのデータ送信をサポートしています。
- 本体
- Fluentd 0.12
- 出力プラグイン
- fluent-plugin-out-http 0.2.0
3.1.2. Fluentd設定ファイル¶
Fluentdは、入出力を含む一連のデータの処理内容を、ディレクティブという単位で設定ファイルに記述します。 主なディレクティブと、Fluentdサーバーにデータを送信するための設定内容を説明します。
■ sourceディレクティブ
sourceディレクティブにはデータ収集の対象に関する設定を行います。
@type
パラメーターで使用する入力プラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。
GridData Lakeにデータを送信する際には、タグ情報に格納先のHDFSのパス(/home/griddata以下)を記述します。 パスの区切り文字には半角ドット(.)を使用します。 1回の送信で一つのJSONデータを送信し、HDFS上のJSONファイルに一行ずつ追記します。
以下はin_tailプラグインを使用する例です。この例では、送信データをHDFSの/home/griddata/output/output_01.jsonファイルに追記します。
<source>
@type tail
format csv
path /root/fluentd/input_01.csv
pos_file /var/log/td-agent/tail.pos
keys key1,key2,key3
tag output.output_01
</source>
そのほかの記述方法や設定内容は、データ入力用の各プラグインの仕様に準拠します。 詳細は Fluentdの入力プラグインの公式ドキュメント を参照してください。
■ filterディレクティブ
filterディレクティブには、収集したデータに対して値の追加や修正を行う場合にその内容を記述します。
@type
パラメーターで使用するフィルタープラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。
GridData Lakeにデータを送信する際には、filterディレクティブに以下に記述を追加します。
<filter **>
@type record_transformer
<record>
tag ${tag}
</record>
</filter>
この設定ではrecord_transformerプラグインを用い、入力プラグインで付与されたタグ情報を「tag」という名前で送信データに加えます。
注釈
送信データ中のタグ情報の名前を変更する場合は、サーバーとクライアント双方の設定ファイルを修正します。 fluentd-server.properties内のTAGNAMEパラメーターの値と、filterディレクティブのrecord要素の値の両方を変更した上で、 Fluentdサーバーを再起動します。
■ matchディレクティブ
matchディレクティブには、送信先の設定を記述します。
@type
パラメーターで使用する出力プラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。
GridData Lakeにデータを送信する際には、出力プラグインとしてfluent-plugin-out-http 0.2.0を指定し、以下のパラメーターを設定します。
項目名 | 設定値 | 備考 |
---|---|---|
endpoint_url | http://<GridData LakeのサーバーのIP>:8080/fluentd/endpoint | 送信先URL |
http_method | post | HTTPメソッド |
serializer | json | 読み込んだデータをサーバーに送信するときのデータ形式 |
use_ssl | false | SSL接続の有無 |
authentication | none | 認証情報の有無 |
以下は、fluent-plugin-out-httpを使用してGridData Lakeにデータを送信する設定ファイルの例です。
<match apache.*>
type http
endpoint_url http://XX.XX.XX.XX:8080/fluentd/endpoint
http_method post
serializer json
use_ssl false
authentication none
open_timeout 5
read_timeout 10
raise_on_error true
raise_on_http_failure true
flush_interval 10s
</match>
3.1.3. データの収集と送信¶
設定ファイルを記述した後にFluentdを起動すると、自動的にデータの収集と送信が行われます。
送信データ本体の形式については、使用する入力プラグインの公式ドキュメントを参照してください。
3.2. Embulkサーバー¶
バッチデータ収集ソフトウェアであるEmbulkからのデータを受信するサーバーにより、HDFSにデータを蓄積します。 EmbulkサーバーはFTPで送信されたデータを受信し、HDFS上ファイルにJSON文字列を追記します。
以下ではEmbulkサーバーに対してEmbulkからデータを送信し、GridData LakeのHDFS上にデータを蓄積する方法を説明します。
3.2.1. 動作環境¶
GridData Lakeでは、以下のEmbulkとそのプラグインからのデータ送信をサポートしています。
- 本体
- Embulk 0.8
- 出力プラグイン
- embulk-out-ftp 0.1.7
3.2.2. Embulk設定ファイル¶
Embulkは設定ファイルをYAML形式で記述します。 このとき、主な設定パラメーターは以下の通りです。
■ exec
Embulkの動作に関する設定を指定します。
■ in
収集データとパージングに関する設定を記述します。
in.type
パラメーターに使用する入力プラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。
GridData Lakeにデータを送信する際に、決められたプラグインやパラメーターの値を指定する必要はありません。 記述方法や設定内容は、使用する入力プラグインに準拠します。 詳細は Embulkのプラグインの公式ドキュメント を参照してください。
■ out
送信データと送信先に関する設定を行います。
out.type
パラメーターに使用する出力プラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。
GridData Lakeにデータを送信する際には、出力プラグインとしてembulk-out-ftp 0.1.7を指定し、以下のパラメーターを設定します。
項目名 | 設定値 | 備考 |
---|---|---|
host | XX.XX.XX.XX | Embulkサーバーが起動しているサーバーのIP |
user | anonymous | Embulkサーバーに接続するユーザー名 |
password | <指定しない> | Embulkサーバーに接続ユーザーのパスワード |
path_prefix | <保存ファイルの絶対パス(HDFS)> | 区切り文字は半角スラッシュ(/) |
file_ext | <保存ファイルの拡張子> | |
port | 50021 | FTP接続ポート番号 |
ssl | false | SSL接続の有無 |
以下は、embulk-out-ftpを使用してGridData Lakeにデータを送信する設定ファイルの例です。 この例では、送信データをHDFSの/output.csvファイルとして送信します。
out:
type: ftp
host: XX.XX.XX.XX
port: 50021
user: anonymous
password:
path_prefix: output
file_ext: csv
passive_mode: false
ssl: false
ssl_verify: false
formatter:
type: csv
header_line: false
charset: UTF-8
newline: CRLF
3.2.3. データの収集と送信¶
作成した設定ファイルを引数にembulkを起動することで、データの送信が行われます。 なお、同名のファイルが既にHDFSに存在する場合、既存のファイルを上書きします。誤って必要なファイルを上書きしてしまわないよう注意してください。
なお、パラメーター out.sequence_format
や exec.min_output_tasks
の値によっては、output01.000.csvなどの連番が保存ファイル名に自動的に付与されます。
3.2.4. レジューム機能¶
Embulkには、異常データやネットワーク障害などで送信が途中で失敗した場合に、途中から再送信できるレジューム機能が備わっています。 Embulkでレジューム機能を有効にして送信するには、以下の二つの条件を満たす必要があります。
- 送信時に、
-r(--resume-state)
オプションを指定して実行する。 - Inputタスクが複数になるように送信する。
-r(--resume-state)
オプションは送信が失敗した場合に、送信の状態を保存するYAMLファイル(resume_stateファイル)を指定するオプションです。
# embulk run [configファイル名] [-r|--resume_state] [resume_stateファイル名]
resume_stateファイルは送信失敗時にのみ生成され、タスクごとに送信の成否が記録されています。 再送信の際も同じオプションを指定して実行すると、すでに送信が成功したタスクをスキップして送信します。
注意
EmbulkクライアントをCtrl+cなどで中断した場合、resume_stateファイルが正常に生成されず、レジューム実行ができません。 Embulkサーバーがシャットダウンした場合など、失敗のタイミングによってresume_stateファイルが正常に生成されない場合があります。 これらの場合は異常の原因を解決した上で、何度か送信を実行すると完了することがあります。ただし、送信は最初からになります。
Inputタスクの数は使用するInputプラグインによって、制御する方法が異なります。 GridData Lakeは、以下の2種類のプラグインについて、Inputタスクを複数にしてレジューム機能を有効にできることを確認しています。
- Local file input plugin(Embulk組み込みプラグイン)
- 一つの入力ファイルにつき、一つのInputタスクを生成します。
パラメーター
path
またはpath_prefix
でディレクトリーや複数ファイルを指定すると、複数のInputタスクを生成します。 - embulk-input-filesplit
- 一つの入力ファイルを複数のInputタスクに分割して送信します。
パラメーター
tasks
を2以上の値にすることで、複数のInputタスクを生成します。
3.3. HDFS¶
HDFSは大規模データ蓄積用の分散ファイルシステムです。 HDFSでファイルを管理するには、hdfs dfsコマンドを使用します。 hdfs dfs -に続いて指定するサブコマンドには、ls、cat、mv、rm、chmodなどのunixコマンドと類似のコマンドを指定します。
(ファイルの一覧を表示)
$ hdfs dfs -ls
Found 1 items
drwxr-xr-x - griddata supergroup 0 yyyy-mm-dd hh:mm test
(ディレクトリーの作成)
$ hdfs dfs -mkdir testdir
$ hdfs dfs -ls
Found 2 items
drwxr-xr-x - griddata supergroup 0 yyyy-mm-dd hh:mm test
drwxr-xr-x - griddata supergroup 0 yyyy-mm-dd hh:mm testdir
また、ローカルのファイルシステムからHDFSにファイルをコピーするにはputを、HDFSからローカルにファイルをコピーするにはgetを使用します。
■ putサブコマンドの使用例
$ ls
localfile.txt
$ hdfs dfs -put ./localfile.txt
$ hdfs dfs -ls
Found 3 items
-rw-r--r-- 2 griddata supergroup 4 yyyy-mm-dd hh:mm localfile.txt
drwxr-xr-x - griddata supergroup 0 yyyy-mm-dd hh:mm test
drwxr-xr-x - griddata supergroup 0 yyyy-mm-dd hh:mm testdir
■ getサブコマンドの使用例
$ ls
$ hdfs dfs -ls
Found 1 items
-rw-r--r-- 2 griddata supergroup 4 yyyy-dd-dd hh:mm hdfsfile.txt
$ hdfs dfs -get hdfsfile.txt
$ ls
hdfsfile.txt
その他の利用可能なコマンドについては、 Hadoopの公式ドキュメント を参照してください。
3.4. GridDBライブラリー¶
以下のライブラリーによりGridDBに接続します。
- GridDB Java API ライブラリー
- Apache Spark向けGridDBコネクター
使用方法は データ編集実行コマンド を参照してください。
3.5. データ編集実行コマンド¶
3.5.1. データ編集アプリケーションを実行する¶
以下の4種類の方法で、データ編集アプリケーションを実行することができます。
Scalaで記述したアプリケーションを実行する
$ spark-submit [--deploy-mode cluster|client] [--master yarn] --class (アプリケーションの実行クラスの完全修飾クラス名) (アプリケーションのjarの絶対パス) [アプリケーションの引数]
Pythonで記述したアプリケーションを実行する
$ spark-submit [--deploy-mode cluster|client] [--master yarn] (アプリケーションのPythonファイルの絶対パス) [アプリケーションの引数]
対話型インタープリター上でScalaコードを実行する
$ spark-shell
対話型インタープリター上でpythonコードを実行する
$ pyspark
注釈
Sparkのアプリケーション実行方式には、clusterモードとclientモードがあります。
spark-submitでscalaアプリケーションとpythonアプリケーションを実行する場合、clusterモードとclientモードのどちらも選択することができます。 他はすべてclientモードのみ選択することができます。
3.5.2. Java APIを利用する¶
GridData Lakeには、HDFSとGridDBのJava API ライブラリーが含まれています。 このライブラリーは環境変数CLASSPATHに登録されているため、Java/ScalaアプリケーションからAPIを呼び出すことができます。
Sparkアプリケーションでない、通常のJavaアプリケーションは以下のコマンドで実行します。
$ java -cp $CLASSPATH (アプリケーションの実行クラスの完全修飾クラス名) (アプリケーションのjarの絶対パス) [アプリケーションの引数]
GridDBのJava APIの仕様については、GridDBの製品ドキュメントを参照してください。
3.5.3. Spark-GridDBコネクターを利用する¶
Apache Spark向けGridDBコネクター(Spark-GridDBコネクター)は、GridDBのコンテナとSparkのDataFrameを相互に変換するAPIです。 このコネクターは環境変数CLASSPATHに登録されているため、Scala、Java、Pythonで記述されたSparkアプリケーションからAPIを呼び出すことができます。
以下はSpark-GridDBコネクターを用いてGridDBコンテナの読み書きを行うScalaソースコードの例です。
// 複数コンテナを読み込みDataFrameに変換
val df = spark.read.format("com.toshiba.mwcloud.datalake.datasource")
.option("names", "column1,column2,column3")
.option("types", "STRING,INTEGER,TIMESTAMP")
.option("partitionnum", 3)
.load("input_*")
// DataFrameを複数コンテナに格納
df.write.format("com.toshiba.mwcloud.datalake.datasource")
.option("multiple", true)
.save("output")
Spark-GridDBコネクターが提供するAPIは、Sparkの標準データソース接続APIであるDataSource APIの仕様に準拠しています。
■ データ読み込み(コンテナ→DataFrame)
データ読み込みには、org.apache.spark.sql.DataFrameReaderクラスの以下のメソッドを使用します。
format(source: String): DataFrameReader
特定の入力ソースを扱うDataFrameReaderインスタンスを返します。
Spark-GridDBコネクターのパッケージ名"com.toshiba.mwcloud.datalake.datasource"を引数に指定すると、Spark-GridDBコネクターを通してGridDBと接続するDataFrameReaderインスタンスを返します。
options(key: String, value: Double/Long/Boolean/String): DataFrameReader
またはoptions(options: Map[String, String]): DataFrameReader
DataFrameReaderに各種オプションを指定します。
Spark-GridDBコネクターは以下のオプションを提供します。
names
- 読み込むコンテナのカラムの名前をカンマ区切りで列挙します。
schema
メソッドを使用しない場合は必ず指定する必要があります。
types
- 読み込むコンテナのカラムのデータ型(GridDB)をカンマ区切りで列挙します。
schema
メソッドを使用しない場合は必ず指定する必要があります。names
と同じ数の要素を記述する必要があります。
partitionNum
- DataFrameのパーティション数を指定します。読み込むコンテナ数よりも少ない値である必要があります。指定しない場合は読み込むコンテナ数と同値とします。
where
- データを読み込む条件を、GridDBのTQLのwhere句として指定します。指定しない場合は条件を設けません。
limit
- 読み込むロウ数の上限を非負整数で指定します。1コンテナ数あたりの値です。指定しない場合はロウ数の上限を設けません。
offset
- コンテナ内で読み込みを開始する開始位置を非負整数で指定します。指定する場合は同時に
limit
も指定する必要があります。指定しない場合は0とします。
schema(schema: StructType): DataFrameReader
読み込むデータのスキーマを指定します。
Spark-GridDBコネクターでは本メソッドは非推奨です。代わりにoptionメソッドで
names
,types
パラメータを指定してください。load(path: String): DataFrame
指定したパスのデータを読み込むDataFrameを返します。
Spark-GridDBコネクターは単一コンテナからのデータ読み込みと、同一スキーマを持つ複数コンテナからのデータ読み込みをサポートします。
- 引数にコンテナ名を指定すると、指定したコンテナのデータを読み込むDataFrameを作成します。
- 引数に正規表現を含むコンテナ名を指定すると、正規表現にマッチするすべてのコンテナのデータを読み込むDataFrameを作成します。
■ データ書き込み(DataFrame→コンテナ)
データ書き込みには、org.apache.spark.sql.DataFrameWriterクラスの以下のメソッドを使用します。
format(source: String): DataFrameWriter[T]
特定の入力ソースを扱うDataFrameWriterインスタンスを返します。
Spark-GridDBコネクターのパッケージ名"com.toshiba.mwcloud.datalake.datasource"を引数に指定すると、Spark-GridDBコネクターを通してGridDBと接続するDataFrameWriterインスタンスを返します。
options(key: String, value: Double/Long/Boolean/String): DataFrameWriter[T]
またはoptions(options: Map[String, String]): DataFrameWriter[T]
DataFrameWriterに各種オプションを指定します。
Spark-GridDBコネクターは以下のオプションを提供します。
multiple
- データを複数のコンテナに書き込むかどうかを指定します。"true"やtrueを指定すると複数コンテナに書き込みます。指定しない場合はfalseになります。
rowkey
- 書き込み先コンテナを新たに作成する場合に、先頭のカラムをロウキーにするかどうかを指定します。"true"やtrueを指定するとロウキーを設定します。指定しない場合はfalseになります。
save(path: String): Unit
指定したパスにDataFrameのデータを書き込みます。
Spark-GridDBコネクターは引数にコンテナ名を指定すると、指定したコンテナにDataFrameのデータを分散して書き込みます。書き込み先のコンテナが存在しない場合は新たに作成します。
multiple
オプションをtrueに指定している場合、「引数に指定した文字列_シーケンス番号」の名前の複数コンテナにデータを書き込みます。 コンテナ数はDataFrameのパーティション数と同じになり、各パーティションのデータが対応するコンテナに書き込まれます。
■ Scala以外の言語
PythonやJavaのコードからもDataSource APIを通してScala-GridDBコネクターを使用できます。
JavaはScalaと互換性があるため、上記ScalaのAPIをそのまま使用できます。
Pythonでは、pyspark.sql.DataFrameReader、pyspark.sql.DataFrameWriterクラスが対応します。 それぞれのクラスには、上記のScalaと同名のメソッドが用意されており、Scalaと同じようにSpark-GridDBコネクターを使用します。