4. GridData Analytics Scale Serverの利用方法

GridData Analytics Scale Serverは、分散ファイルシステムHDFSと並列分散処理基盤Sparkのクラスターです。 GridData Analytics Serverと同一のライブラリー群を同梱しており、分散処理中に各ライブラリーを使用できます。

4.1. HDFSを操作する

GridData Analytics ServerのJupyter Notebookから、GridData Analytics Scale ServerのHDFSを操作します。

4.1.1. PythonからHDFSを操作する

PythonスクリプトからGridData Analytics Scale ServerのHDFSに接続するには、pyarrowを使用します。

# PyArrowによるHDFS接続
import pyarrow as pa
hdfs = pa.hdfs.connect('SERVER1',8020,'griddata')

このときpa.hdfs.connectの引数にはそれぞれ、GridData Analytics Scale ServerのアクティブなNameNodeのホスト名(ここではSERVER1)、 ポート番号、ユーザー名を指定します。ポート番号とユーザー名はデフォルトではそれぞれ8020、griddataです。

以下はpyarrowによるHDFSの接続後の操作の例です。

# ファイルの表示
hdfs.ls("/")
# ディレクトリーの作成
hdfs.mkdir("/dir")
# ファイル内容の表示
hdfs.cat("/test.txt")
# ファイルやディレクトリーの削除
hdfs.delete("/test.txt")

また、HDFS上のCSVファイルを読み込み、PandasのDataFrameを作成する場合の例を以下に記載します。

import pyarrow as pa
from io import StringIO
import pandas as pd
hdfs = pa.hdfs.connect('SERVER1',8020,'griddata')
with hdfs.open('/test.csv') as f:
    #バイトデータでファイルを取得
    bytedata = f.read(f.size())
    #Stringに変換
    strdata = bytedata.decode('utf-8')
#DataFrameに変換
data = StringIO(strdata)
df = pd.read_csv(data, sep=",",header=None)
df.columns = list('ABCDE')

その他の操作方法については、 pyarrowの公式ドキュメント を参照してください。

4.1.2. RからHDFSを操作する

RスクリプトからGridData Analytics Scale ServerのHDFSにアクセスするには、rhdfsを使用します。

library(rJava)
library(rhdfs)
hdfs.init()

以下はrhdfsによるHDFSの操作の例です。

# ファイルの表示
hdfs.ls('/')
# ディレクトリーの作成
hdfs.mkdir('/user/test')
# ファイル内容の表示
hdfs.cat("/test.txt")
# ファイルへのテキスト書き込み
data <- "helloworld"
file <- hdfs.file("helloworld.txt", "w")
data2 <- charToRaw(data)
hdfs.write(data2,file)
hdfs.close(file)

その他の操作コマンドについては、 rhdfsの公式ドキュメント を参照してください。

4.2. Sparkクラスター上で処理を実行する

GridData Analytics Scale ServerのSpark上で分散処理を実行するには、PySparkを使用します。

PySparkは以下のカーネルから使用することができます。 カーネルにより、PySparkのエントリーポイントであるSparkSessionの作成方法が異なります。

■ PySparkカーネル

 PySparkカーネルでは、SparkSessionがカーネル起動時に自動作成されます。 PySparkカーネルは使用リソースの異なる3種類が用意されています。 扱うデータサイズ、処理性能、ジョブの並列数に応じて、使用するカーネルのサイズを使い分けてください。

■ Python3カーネル

 Python3カーネルでは、SparkSessionを手動で作成します。 手動でSparkSessionを作成する際、任意の設定を付与することができます。 そのため、用意されたPySparkカーネル以外のリソースを設定し、処理を行うことができます。

各PySparkカーネルの設定と、Python3カーネルにおけるデフォルトの設定は以下の表を参照してください。

パラメータ 説明 PySpark_small PySpark_medium PySpark_large Python3 (デフォルト)
driver-memory driverのメモリー量 2g 3g 6g 1g
driver.maxResultSize driverがアクションで使用するメモリー量 1g 2g 4g 1g
num-executor executor数 1 6 6 2
executor-cores 各executorのコア数 1 1 3 1
executor-memory 各executorのメモリー量 1g 2g 8g 1g
executor.memoryOverhead 各executorのオフヒープメモリー(MB) 1024 1024 2048 384
appName アプリケーション名 PySpark_small PySpark_medium PySpark_large pyspark-shell

詳細な説明とその他のパラメータについては Apache Sparkの公式ドキュメント を参照してください。

以下にPySparkを使用し、円周率を計算する例を記載します。

(1) (Python3カーネルのみ) 環境変数を設定します。

あらかじめfindsparkをインポートし、初期化することで、Sparkの実行に必要な環境変数が設定されます。

import findspark
findspark.init()

(2) (Python3カーネルのみ) SparkContextまたはSparkSessionを作成します。

ここではdriverのメモリー量と、アプリケーション名を変更します。

from pyspark.sql import SparkSession
# sparksessionの作成
spark = SparkSession.builder.master("yarn").config("spark.driver.memory","10g").appName("PythonPi3").getOrCreate()
# sparksessionからspark.sparkContextを作成
spark.sparkContext

(3) SparkSessionを使用して並列処理を実行します。

並列処理を実行するには、あらかじめ処理を行いたいデータを、 まずSparkのデータモデルであるRDDやDataFrameに変換します。 続いて変換したRDDやDataFrameに対し、値の変換や集計などの処理内容を記述します。

PySparkカーネルではSparkSession(spark)が自動的に作成されているため、 (3)から処理を記述することができます。

# pysparkを使用した円周率の計算サンプル
from random import random
from operator import add
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

n = 100000 * 2
def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0
# 作成した関数をSparkのユーザー定義関数に登録
myudf = udf(f)
# rangeのリストをrddに変換
rdd=spark.sparkContext.parallelize(range(1, n + 1), 3)
# rddを、indexというカラム名を付けDataFrameに変換
df1=rdd.map(Row("index")).toDF()
# DataFrameにresultというカラムを追加し、indexの各データに対し関数を適用した結果を格納
df2=df1.withColumn("result", myudf("index").cast(IntegerType()))
# 結果の集計
count=df2.groupBy().sum('result').collect()[0][0]
# 結果の表示
print("Pi =  %f" % (4.0 * count / n))
# sparksessionの終了
spark.stop()

RDDやDataFrameの詳しい操作方法については、Sparkの公式ドキュメントにある以下のページを参照してください。

RDDのページ

DataFrameのページ

また、サンプルノートブックsample_pyspark(PySparkカーネル)、sample_pyspark_python3(Python3カーネル)では、上記と同様に GridData Analytics Scale ServerのSpark上で並列分散処理による円周率の計算を行っています。

4.2.1. Spark-GridDBコネクターを利用する

Spark-GridDBコネクターは、GridDBのコンテナとSparkのDataFrameを相互に変換するAPIです。 Python3、ScalaカーネルからAPIを呼び出すことができます。

以下はPython3カーネルからSpark-GridDBコネクターを使用する場合の例です。

# Sparkの初期化
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("GridDBConnector").getOrCreate()

# 複数コンテナを読み込みDataFrameに変換
df = spark.read.format("com.toshiba.mwcloud.datalake.datasource")
        .option("names", "column1,column2,column3")
        .option("types", "STRING,INTEGER,TIMESTAMP")
        .option("partitionnum", 3)
        .load("input_*")

# DataFrameを複数コンテナに格納
df.write.format("com.toshiba.mwcloud.datalake.datasource")
        .option("multiple", true)
        .save("output")

Spark-GridDBコネクターが提供するAPIは、Sparkの標準データソース接続APIであるDataSource APIの仕様に準拠しています。

  1. データ読み込み(コンテナ→DataFrame)

データ読み込みには、pyspark.sql.DataFrameReaderクラスの以下のメソッドを使用します。

format(source: String): DataFrameReader

特定の入力ソースを扱うDataFrameReaderインスタンスを返します。

Spark-GridDBコネクターのパッケージ名"com.toshiba.mwcloud.datalake.datasource"を引数に指定すると、Spark-GridDBコネクターを通してGridDBと接続するDataFrameReaderインスタンスを返します。

options(key: String, value: Double/Long/Boolean/String): DataFrameReader または options(options: Map[String, String]): DataFrameReader

DataFrameReaderに各種オプションを指定します。

Spark-GridDBコネクターは以下のオプションを提供します。

  • names
    読み込むコンテナのカラムの名前をカンマ区切りで列挙します。 schema メソッドを使用しない場合は必ず指定する必要があります。
  • types
    読み込むコンテナのカラムのデータ型(GridDB)をカンマ区切りで列挙します。 schema メソッドを使用しない場合は必ず指定する必要があります。 names と同じ数の要素を記述する必要があります。
  • partitionNum
    DataFrameのパーティション数を指定します。読み込むコンテナ数よりも少ない値である必要があります。指定しない場合は読み込むコンテナ数と同値とします。
  • where
    データを読み込む条件を、GridDBのTQLのwhere句として指定します。指定しない場合は条件を設けません。
  • limit
    読み込むロウ数の上限を非負整数で指定します。1コンテナ数あたりの値です。指定しない場合はロウ数の上限を設けません。
  • offset
    コンテナ内で読み込みを開始する開始位置を非負整数で指定します。指定する場合は同時に limit も指定する必要があります。指定しない場合は0とします。
schema(schema: StructType): DataFrameReader

読み込むデータのスキーマを指定します。

Spark-GridDBコネクターでは本メソッドは非推奨です。代わりにoptionメソッドで names , types パラメータを指定してください。

load(path: String): DataFrame

指定したパスのデータを読み込むDataFrameを返します。

Spark-GridDBコネクターは単一コンテナからのデータ読み込みと、同一スキーマを持つ複数コンテナからのデータ読み込みをサポートします。

  • 引数にコンテナ名を指定すると、指定したコンテナのデータを読み込むDataFrameを作成します。
  • 引数に正規表現を含むコンテナ名を指定すると、正規表現にマッチするすべてのコンテナのデータを読み込むDataFrameを作成します。
  1. データ書き込み(DataFrame→コンテナ)

データ書き込みには、pyspark.sql.DataFrameWriterクラスの以下のメソッドを使用します。

format(source: String): DataFrameWriter[T]

特定の入力ソースを扱うDataFrameWriterインスタンスを返します。

Spark-GridDBコネクターのパッケージ名"com.toshiba.mwcloud.datalake.datasource"を引数に指定すると、Spark-GridDBコネクターを通してGridDBと接続するDataFrameWriterインスタンスを返します。

options(key: String, value: Double/Long/Boolean/String): DataFrameWriter[T] または options(options: Map[String, String]): DataFrameWriter[T]

DataFrameWriterに各種オプションを指定します。

Spark-GridDBコネクターは以下のオプションを提供します。

  • multiple
    データを複数のコンテナに書き込むかどうかを指定します。"true"やtrueを指定すると複数コンテナに書き込みます。指定しない場合はfalseになります。
  • rowkey
    書き込み先コンテナを新たに作成する場合に、先頭のカラムをロウキーにするかどうかを指定します。"true"やtrueを指定するとロウキーを設定します。指定しない場合はfalseになります。
save(path: String): Unit

指定したパスにDataFrameのデータを書き込みます。

Spark-GridDBコネクターは引数にコンテナ名を指定すると、指定したコンテナにDataFrameのデータを分散して書き込みます。書き込み先のコンテナが存在しない場合は新たに作成します。

multiple オプションをtrueに指定している場合、「引数に指定した文字列_シーケンス番号」の名前の複数コンテナにデータを書き込みます。 コンテナ数はDataFrameのパーティション数と同じになり、各パーティションのデータが対応するコンテナに書き込まれます。

ScalaやJavaのコードからもDataSource APIを通してScala-GridDBコネクターを使用できます。 org.apache.spark.sql.DataFrameReader、org.apache.spark.sql.DataFrameWriterクラスが対応します。 それぞれのクラスには、上記のPythonと同名のメソッドが用意されており、Pythonと同じようにSpark-GridDBコネクターを使用します。