4. GridData Analytics Scale Serverの利用方法

4.1. HDFSを操作する

本節では、GridData Analytics Studioのupyter Notebookから、 GridData Analytics Scale ServerのHDFSを操作する方法について説明します。

4.1.1. PythonからHDFSを操作する

Jupyter NotebookのPythonカーネルを使用し GridData Analytics Scale ServerのHDFSにアクセスする場合、以下のように pyarrowを使用します

# 環境変数の設定
import os
os.environ["JAVA_HOME"] = "/usr/java/default"
os.environ["JAVA_LIB"]="/usr/java/default/jre/lib/amd64/server/"
os.environ["HADOOP_HOME"] = "/home/griddata/griddata-core/hadoop"
CLASSPATH=!`$HADOOP_HOME/bin/hdfs classpath --glob`
os.environ["CLASSPATH"] = ''.join(CLASSPATH)
os.environ["ARROW_LIBHDFS_DIR"] = "/home/griddata/griddata-core/hadoop/lib/native"
os.environ['HADOOP_CONF_DIR'] ="/home/griddata/griddata-core/hadoop/conf"

# PyArrowによるHDFS接続
import pyarrow as pa
hdfs=pa.HdfsClient('SERVER1',8020,'griddata')

このとき、pa.hdfs.connectの引数には、GridData Analytics Scale ServerのアクティブなNameNode(ここではSERVER1)を指定します。

pyarrowを使用したHDFSの操作例は以下の通りです。

# ファイルの表示
hdfs.ls("/")
# ディレクトリの作成
hdfs.mkdir("/dir")
# ファイル内容の表示
hdfs.cat("/test.txt")
# ファイルやディレクトリの削除
hdfs.delete("/test.txt")

その他の操作コマンドについては、 pyarrowの公式ページ を参照してください。

4.2. Sparkクラスタ上で処理を実行する

本節では、pysparkを使用し GridData Analytics StudioやGridData Analytics Serverから、 GridData Analytics Scale Serverで分散処理を行う方法を説明します。

(1)環境変数PYSPARK_DRIVER_PYTHONおよびPYSPARK_PYTHONを設定します。

Python2またはPython3カーネルのノートブックにて、以下のように環境変数を設定します。

# 環境変数の設定
import sys
import os
os.environ['JAVA_HOME'] = "/usr/java/default"
os.environ['SPARK_HOME'] = "/home/griddata/griddata-core/spark/"
os.environ['JAVA_LIB'] = "/usr/java/default/jre/lib/amd64/server/"
os.environ['PYTHONPATH'] = "/home/griddata/griddata-core/spark/python/lib/py4j-0.10.4-src.zip:/home/griddata/griddata-core/spark/python"
os.environ['HADOOP_CONF_DIR'] ="/home/griddata/griddata-core/hadoop/conf"
# 実行するPythonカーネルに応じ、環境変数PYSPARK_DRIVER_PYTHON、PYSPARK_PYTHONを設定
if sys.version_info.major == 2:
    %env PYSPARK_DRIVER_PYTHON=/home/griddata/analytics/.pyenv/python2.7
    %env PYSPARK_PYTHON=/home/griddata/analytics/.pyenv/python2.7
else:
    %env PYSPARK_DRIVER_PYTHON=/home/griddata/analytics/.pyenv/python3.6
    %env PYSPARK_PYTHON=/home/griddata/analytics/.pyenv/python3.6

(2)SparkContextの作成

from pyspark.sql import SparkSession
# sparksessionの作成
spark = SparkSession.builder.appName("PythonPi3").getOrCreate()
# sparksessionからspark.sparkContextが作成されることを確認する
spark.sparkContext

(3)SparkContextを使用した並列処理の実行

並列処理を実行するには、あらかじめ処理を行いたいデータに対しspark.sparkContext.parallelize()を実行します。 これにより、データはRDDと呼ばれる並列処理用のデータ形式に変換されます。 その後、RDDに対しmapやreduce、collectといった処理内容を記述・実行します。

具体例として、pysparkを使用して円周率を計算する例を以下に記載します。

# pysparkを使用した円周率の計算サンプル
from random import random
from operator import add

n = 100000 * 2
def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0

#SparkContextを使用した並列処理の実行
count = spark.sparkContext.parallelize(range(1, n + 1), 3).map(f).reduce(add)
# 結果の表示
print("Pi =  %f" % (4.0 * count / n))
# sparksessionの終了
spark.stop()

RDDの詳しい操作方法については、 Sparkの公式ページ を参照してください。