3. その他の機能の利用方法

3.1. Fluentdサーバー

ログデータ収集ソフトウェアであるFluentdからのデータを受信するサーバーにより、HDFSにデータを蓄積します。 FluentdサーバーはHTTPで送信されたJSONデータを受信し、HDFS上のファイルにJSON文字列を追記します。

以下ではFluentdサーバーに対してFluentdからデータを送信し、GridData LakeのHDFS上にデータを蓄積する方法を説明します。

3.1.1. 動作環境

GridData Lakeでは、以下のFluentdとそのプラグインからのデータ送信をサポートしています。

本体
Fluentd 0.12
出力プラグイン
fluent-plugin-out-http 0.2.0

3.1.2. Fluentd設定ファイル

Fluentdは、入出力を含む一連のデータの処理内容を、ディレクティブという単位で設定ファイルに記述します。 主なディレクティブと、Fluentdサーバーにデータを送信するための設定内容を説明します。

■ sourceディレクティブ

sourceディレクティブにはデータ収集の対象に関する設定を行います。 @type パラメーターで使用する入力プラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。

GridData Lakeにデータを送信する際には、タグ情報に格納先のHDFSのパス(/home/griddata以下)を記述します。 パスの区切り文字には半角ドット(.)を使用します。 1回の送信で一つのJSONデータを送信し、HDFS上のJSONファイルに一行ずつ追記します。

以下はin_tailプラグインを使用する例です。この例では、送信データをHDFSの/home/griddata/output/output_01.jsonファイルに追記します。

<source>
   @type tail
   format csv
   path /root/fluentd/input_01.csv
   pos_file /var/log/td-agent/tail.pos
   keys key1,key2,key3
   tag output.output_01
</source>

そのほかの記述方法や設定内容は、データ入力用の各プラグインの仕様に準拠します。 詳細は Fluentdの入力プラグインの公式ドキュメント を参照してください。

■ filterディレクティブ

filterディレクティブには、収集したデータに対して値の追加や修正を行う場合にその内容を記述します。 @type パラメーターで使用するフィルタープラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。

GridData Lakeにデータを送信する際には、filterディレクティブに以下に記述を追加します。

<filter **>
  @type record_transformer
  <record>
    tag ${tag}
  </record>
</filter>

この設定ではrecord_transformerプラグインを用い、入力プラグインで付与されたタグ情報を「tag」という名前で送信データに加えます。

注釈

送信データ中のタグ情報の名前を変更する場合は、サーバーとクライアント双方の設定ファイルを修正します。 fluentd-server.properties内のTAGNAMEパラメーターの値と、filterディレクティブのrecord要素の値の両方を変更した上で、 Fluentdサーバーを再起動します。

■ matchディレクティブ

matchディレクティブには、送信先の設定を記述します。 @type パラメーターで使用する出力プラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。

GridData Lakeにデータを送信する際には、出力プラグインとしてfluent-plugin-out-http 0.2.0を指定し、以下のパラメーターを設定します。

項目名 設定値 備考
endpoint_url http://<GridData LakeのサーバーのIP>:8080/fluentd/endpoint 送信先URL
http_method post HTTPメソッド
serializer json 読み込んだデータをサーバーに送信するときのデータ形式
use_ssl false SSL接続の有無
authentication none 認証情報の有無

以下は、fluent-plugin-out-httpを使用してGridData Lakeにデータを送信する設定ファイルの例です。

<match apache.*>
  type http
  endpoint_url          http://XX.XX.XX.XX:8080/fluentd/endpoint
  http_method           post
  serializer            json
  use_ssl               false
  authentication        none
  open_timeout          5
  read_timeout          10
  raise_on_error        true
  raise_on_http_failure true
  flush_interval 10s
</match>

3.1.3. データの収集と送信

設定ファイルを記述した後にFluentdを起動すると、自動的にデータの収集と送信が行われます。

送信データ本体の形式については、使用する入力プラグインの公式ドキュメントを参照してください。

3.2. Embulkサーバー

バッチデータ収集ソフトウェアであるEmbulkからのデータを受信するサーバーにより、HDFSにデータを蓄積します。 EmbulkサーバーはFTPで送信されたデータを受信し、HDFS上ファイルにJSON文字列を追記します。

以下ではEmbulkサーバーに対してEmbulkからデータを送信し、GridData LakeのHDFS上にデータを蓄積する方法を説明します。

3.2.1. 動作環境

GridData Lakeでは、以下のEmbulkとそのプラグインからのデータ送信をサポートしています。

本体
Embulk 0.8
出力プラグイン
embulk-out-ftp 0.1.7

3.2.2. Embulk設定ファイル

Embulkは設定ファイルをYAML形式で記述します。 このとき、主な設定パラメーターは以下の通りです。

■ exec

Embulkの動作に関する設定を指定します。

■ in

収集データとパージングに関する設定を記述します。 in.type パラメーターに使用する入力プラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。

GridData Lakeにデータを送信する際に、決められたプラグインやパラメーターの値を指定する必要はありません。 記述方法や設定内容は、使用する入力プラグインに準拠します。 詳細は Embulkのプラグインの公式ドキュメント を参照してください。

■ out

送信データと送信先に関する設定を行います。 out.type パラメーターに使用する出力プラグインを指定し、プラグインごとに定義された設定パラメーターを設定します。

GridData Lakeにデータを送信する際には、出力プラグインとしてembulk-out-ftp 0.1.7を指定し、以下のパラメーターを設定します。

項目名 設定値 備考
host XX.XX.XX.XX Embulkサーバーが起動しているサーバーのIP
user anonymous Embulkサーバーに接続するユーザー名
password <指定しない> Embulkサーバーに接続ユーザーのパスワード
path_prefix <保存ファイルの絶対パス(HDFS)> 区切り文字は半角スラッシュ(/)
file_ext <保存ファイルの拡張子>  
port 50021 FTP接続ポート番号
ssl false SSL接続の有無

以下は、embulk-out-ftpを使用してGridData Lakeにデータを送信する設定ファイルの例です。 この例では、送信データをHDFSの/output.csvファイルとして送信します。

out:
  type: ftp
  host: XX.XX.XX.XX
  port: 50021
  user: anonymous
  password:
  path_prefix: output
  file_ext: csv
  passive_mode: false
  ssl: false
  ssl_verify: false
  formatter:
    type: csv
    header_line: false
    charset: UTF-8
    newline: CRLF

3.2.3. データの収集と送信

作成した設定ファイルを引数にembulkを起動することで、データの送信が行われます。 なお、同名のファイルが既にHDFSに存在する場合、既存のファイルを上書きします。誤って必要なファイルを上書きしてしまわないよう注意してください。

なお、パラメーター out.sequence_formatexec.min_output_tasks の値によっては、output01.000.csvなどの連番が保存ファイル名に自動的に付与されます。

3.2.4. レジューム機能

Embulkには、異常データやネットワーク障害などで送信が途中で失敗した場合に、途中から再送信できるレジューム機能が備わっています。 Embulkでレジューム機能を有効にして送信するには、以下の二つの条件を満たす必要があります。

  • 送信時に、 -r(--resume-state) オプションを指定して実行する。
  • Inputタスクが複数になるように送信する。

-r(--resume-state) オプションは送信が失敗した場合に、送信の状態を保存するYAMLファイル(resume_stateファイル)を指定するオプションです。

# embulk run [configファイル名] [-r|--resume_state] [resume_stateファイル名]

resume_stateファイルは送信失敗時にのみ生成され、タスクごとに送信の成否が記録されています。 再送信の際も同じオプションを指定して実行すると、すでに送信が成功したタスクをスキップして送信します。

注意

EmbulkクライアントをCtrl+cなどで中断した場合、resume_stateファイルが正常に生成されず、レジューム実行ができません。 Embulkサーバーがシャットダウンした場合など、失敗のタイミングによってresume_stateファイルが正常に生成されない場合があります。 これらの場合は異常の原因を解決した上で、何度か送信を実行すると完了することがあります。ただし、送信は最初からになります。

Inputタスクの数は使用するInputプラグインによって、制御する方法が異なります。 GridData Lakeは、以下の2種類のプラグインについて、Inputタスクを複数にしてレジューム機能を有効にできることを確認しています。

Local file input plugin(Embulk組み込みプラグイン)
一つの入力ファイルにつき、一つのInputタスクを生成します。 パラメーター path または path_prefix でディレクトリーや複数ファイルを指定すると、複数のInputタスクを生成します。
embulk-input-filesplit
一つの入力ファイルを複数のInputタスクに分割して送信します。 パラメーター tasks を2以上の値にすることで、複数のInputタスクを生成します。

3.3. HDFS

HDFSは大規模データ蓄積用の分散ファイルシステムです。 HDFSでファイルを管理するには、hdfs dfsコマンドを使用します。 hdfs dfs -に続いて指定するサブコマンドには、ls、cat、mv、rm、chmodなどのunixコマンドと類似のコマンドを指定します。

(ファイルの一覧を表示)
$ hdfs dfs -ls
Found 1 items
drwxr-xr-x   - griddata supergroup          0 yyyy-mm-dd hh:mm test
(ディレクトリーの作成)
$ hdfs dfs -mkdir testdir
$ hdfs dfs -ls
Found 2 items
drwxr-xr-x   - griddata supergroup          0 yyyy-mm-dd hh:mm test
drwxr-xr-x   - griddata supergroup          0 yyyy-mm-dd hh:mm testdir

また、ローカルのファイルシステムからHDFSにファイルをコピーするにはputを、HDFSからローカルにファイルをコピーするにはgetを使用します。

■ putサブコマンドの使用例

$ ls
localfile.txt
$ hdfs dfs -put ./localfile.txt
$ hdfs dfs -ls
Found 3 items
-rw-r--r--   2 griddata supergroup          4 yyyy-mm-dd hh:mm localfile.txt
drwxr-xr-x   - griddata supergroup          0 yyyy-mm-dd hh:mm test
drwxr-xr-x   - griddata supergroup          0 yyyy-mm-dd hh:mm testdir

■ getサブコマンドの使用例

$ ls
$ hdfs dfs -ls
Found 1 items
-rw-r--r--   2 griddata supergroup          4 yyyy-dd-dd hh:mm hdfsfile.txt
$ hdfs dfs -get hdfsfile.txt
$ ls
hdfsfile.txt

その他の利用可能なコマンドについては、 Hadoopの公式ドキュメント を参照してください。

3.4. GridDBライブラリー

以下のライブラリーによりGridDBに接続します。

  • GridDB Java API ライブラリー
  • Apache Spark向けGridDBコネクター

使用方法は データ編集実行コマンド を参照してください。

3.5. データ編集実行コマンド

3.5.1. データ編集アプリケーションを実行する

以下の4種類の方法で、データ編集アプリケーションを実行することができます。

Scalaで記述したアプリケーションを実行する

$ spark-submit [--deploy-mode cluster|client] [--master yarn] --class (アプリケーションの実行クラスの完全修飾クラス名) (アプリケーションのjarの絶対パス) [アプリケーションの引数]

Pythonで記述したアプリケーションを実行する

$ spark-submit [--deploy-mode cluster|client] [--master yarn] (アプリケーションのPythonファイルの絶対パス) [アプリケーションの引数]

対話型インタープリター上でScalaコードを実行する

$ spark-shell

対話型インタープリター上でpythonコードを実行する

$ pyspark

注釈

Sparkのアプリケーション実行方式には、clusterモードとclientモードがあります。

spark-submitでscalaアプリケーションとpythonアプリケーションを実行する場合、clusterモードとclientモードのどちらも選択することができます。 他はすべてclientモードのみ選択することができます。

3.5.2. Java APIを利用する

GridData Lakeには、HDFSとGridDBのJava API ライブラリーが含まれています。 このライブラリーは環境変数CLASSPATHに登録されているため、Java/ScalaアプリケーションからAPIを呼び出すことができます。

Sparkアプリケーションでない、通常のJavaアプリケーションは以下のコマンドで実行します。

$ java -cp $CLASSPATH (アプリケーションの実行クラスの完全修飾クラス名) (アプリケーションのjarの絶対パス) [アプリケーションの引数]

GridDBのJava APIの仕様については、GridDBの製品ドキュメントを参照してください。

3.5.3. Spark-GridDBコネクターを利用する

Apache Spark向けGridDBコネクター(Spark-GridDBコネクター)は、GridDBのコンテナとSparkのDataFrameを相互に変換するAPIです。 このコネクターは環境変数CLASSPATHに登録されているため、Scala、Java、Pythonで記述されたSparkアプリケーションからAPIを呼び出すことができます。

以下はSpark-GridDBコネクターを用いてGridDBコンテナの読み書きを行うScalaソースコードの例です。

// 複数コンテナを読み込みDataFrameに変換
val df = spark.read.format("com.toshiba.mwcloud.datalake.datasource")
        .option("names", "column1,column2,column3")
        .option("types", "STRING,INTEGER,TIMESTAMP")
        .option("partitionnum", 3)
        .load("input_*")

// DataFrameを複数コンテナに格納
df.write.format("com.toshiba.mwcloud.datalake.datasource")
        .option("multiple", true)
        .save("output")

Spark-GridDBコネクターが提供するAPIは、Sparkの標準データソース接続APIであるDataSource APIの仕様に準拠しています。

■ データ読み込み(コンテナ→DataFrame)

データ読み込みには、org.apache.spark.sql.DataFrameReaderクラスの以下のメソッドを使用します。

format(source: String): DataFrameReader

特定の入力ソースを扱うDataFrameReaderインスタンスを返します。

Spark-GridDBコネクターのパッケージ名"com.toshiba.mwcloud.datalake.datasource"を引数に指定すると、Spark-GridDBコネクターを通してGridDBと接続するDataFrameReaderインスタンスを返します。

options(key: String, value: Double/Long/Boolean/String): DataFrameReader または options(options: Map[String, String]): DataFrameReader

DataFrameReaderに各種オプションを指定します。

Spark-GridDBコネクターは以下のオプションを提供します。

  • names
    読み込むコンテナのカラムの名前をカンマ区切りで列挙します。 schema メソッドを使用しない場合は必ず指定する必要があります。
  • types
    読み込むコンテナのカラムのデータ型(GridDB)をカンマ区切りで列挙します。 schema メソッドを使用しない場合は必ず指定する必要があります。 names と同じ数の要素を記述する必要があります。
  • partitionNum
    DataFrameのパーティション数を指定します。読み込むコンテナ数よりも少ない値である必要があります。指定しない場合は読み込むコンテナ数と同値とします。
  • where
    データを読み込む条件を、GridDBのTQLのwhere句として指定します。指定しない場合は条件を設けません。
  • limit
    読み込むロウ数の上限を非負整数で指定します。1コンテナ数あたりの値です。指定しない場合はロウ数の上限を設けません。
  • offset
    コンテナ内で読み込みを開始する開始位置を非負整数で指定します。指定する場合は同時に limit も指定する必要があります。指定しない場合は0とします。
schema(schema: StructType): DataFrameReader

読み込むデータのスキーマを指定します。

Spark-GridDBコネクターでは本メソッドは非推奨です。代わりにoptionメソッドで names , types パラメータを指定してください。

load(path: String): DataFrame

指定したパスのデータを読み込むDataFrameを返します。

Spark-GridDBコネクターは単一コンテナからのデータ読み込みと、同一スキーマを持つ複数コンテナからのデータ読み込みをサポートします。

  • 引数にコンテナ名を指定すると、指定したコンテナのデータを読み込むDataFrameを作成します。
  • 引数に正規表現を含むコンテナ名を指定すると、正規表現にマッチするすべてのコンテナのデータを読み込むDataFrameを作成します。

■ データ書き込み(DataFrame→コンテナ)

データ書き込みには、org.apache.spark.sql.DataFrameWriterクラスの以下のメソッドを使用します。

format(source: String): DataFrameWriter[T]

特定の入力ソースを扱うDataFrameWriterインスタンスを返します。

Spark-GridDBコネクターのパッケージ名"com.toshiba.mwcloud.datalake.datasource"を引数に指定すると、Spark-GridDBコネクターを通してGridDBと接続するDataFrameWriterインスタンスを返します。

options(key: String, value: Double/Long/Boolean/String): DataFrameWriter[T] または options(options: Map[String, String]): DataFrameWriter[T]

DataFrameWriterに各種オプションを指定します。

Spark-GridDBコネクターは以下のオプションを提供します。

  • multiple
    データを複数のコンテナに書き込むかどうかを指定します。"true"やtrueを指定すると複数コンテナに書き込みます。指定しない場合はfalseになります。
  • rowkey
    書き込み先コンテナを新たに作成する場合に、先頭のカラムをロウキーにするかどうかを指定します。"true"やtrueを指定するとロウキーを設定します。指定しない場合はfalseになります。
save(path: String): Unit

指定したパスにDataFrameのデータを書き込みます。

Spark-GridDBコネクターは引数にコンテナ名を指定すると、指定したコンテナにDataFrameのデータを分散して書き込みます。書き込み先のコンテナが存在しない場合は新たに作成します。

multiple オプションをtrueに指定している場合、「引数に指定した文字列_シーケンス番号」の名前の複数コンテナにデータを書き込みます。 コンテナ数はDataFrameのパーティション数と同じになり、各パーティションのデータが対応するコンテナに書き込まれます。

■ Scala以外の言語

PythonやJavaのコードからもDataSource APIを通してScala-GridDBコネクターを使用できます。

JavaはScalaと互換性があるため、上記ScalaのAPIをそのまま使用できます。

Pythonでは、pyspark.sql.DataFrameReader、pyspark.sql.DataFrameWriterクラスが対応します。 それぞれのクラスには、上記のScalaと同名のメソッドが用意されており、Scalaと同じようにSpark-GridDBコネクターを使用します。