4. GridData Analytics Scale Serverの利用方法¶
4.1. HDFSを操作する¶
本節では、GridData Analytics Studioのupyter Notebookから、 GridData Analytics Scale ServerのHDFSを操作する方法について説明します。
4.1.1. PythonからHDFSを操作する¶
Jupyter NotebookのPythonカーネルを使用し GridData Analytics Scale ServerのHDFSにアクセスする場合、以下のように pyarrowを使用します
# 環境変数の設定
import os
os.environ["JAVA_HOME"] = "/usr/java/default"
os.environ["JAVA_LIB"]="/usr/java/default/jre/lib/amd64/server/"
os.environ["HADOOP_HOME"] = "/home/griddata/griddata-core/hadoop"
CLASSPATH=!`$HADOOP_HOME/bin/hdfs classpath --glob`
os.environ["CLASSPATH"] = ''.join(CLASSPATH)
os.environ["ARROW_LIBHDFS_DIR"] = "/home/griddata/griddata-core/hadoop/lib/native"
os.environ['HADOOP_CONF_DIR'] ="/home/griddata/griddata-core/hadoop/conf"
# PyArrowによるHDFS接続
import pyarrow as pa
hdfs=pa.HdfsClient('SERVER1',8020,'griddata')
このとき、pa.hdfs.connectの引数には、GridData Analytics Scale ServerのアクティブなNameNode(ここではSERVER1)を指定します。
pyarrowを使用したHDFSの操作例は以下の通りです。
# ファイルの表示
hdfs.ls("/")
# ディレクトリの作成
hdfs.mkdir("/dir")
# ファイル内容の表示
hdfs.cat("/test.txt")
# ファイルやディレクトリの削除
hdfs.delete("/test.txt")
その他の操作コマンドについては、 pyarrowの公式ページ を参照してください。
4.2. Sparkクラスタ上で処理を実行する¶
本節では、pysparkを使用し GridData Analytics StudioやGridData Analytics Serverから、 GridData Analytics Scale Serverで分散処理を行う方法を説明します。
(1)環境変数PYSPARK_DRIVER_PYTHONおよびPYSPARK_PYTHONを設定します。
Python2またはPython3カーネルのノートブックにて、以下のように環境変数を設定します。
# 環境変数の設定
import sys
import os
os.environ['JAVA_HOME'] = "/usr/java/default"
os.environ['SPARK_HOME'] = "/home/griddata/griddata-core/spark/"
os.environ['JAVA_LIB'] = "/usr/java/default/jre/lib/amd64/server/"
os.environ['PYTHONPATH'] = "/home/griddata/griddata-core/spark/python/lib/py4j-0.10.4-src.zip:/home/griddata/griddata-core/spark/python"
os.environ['HADOOP_CONF_DIR'] ="/home/griddata/griddata-core/hadoop/conf"
# 実行するPythonカーネルに応じ、環境変数PYSPARK_DRIVER_PYTHON、PYSPARK_PYTHONを設定
if sys.version_info.major == 2:
%env PYSPARK_DRIVER_PYTHON=/home/griddata/analytics/.pyenv/python2.7
%env PYSPARK_PYTHON=/home/griddata/analytics/.pyenv/python2.7
else:
%env PYSPARK_DRIVER_PYTHON=/home/griddata/analytics/.pyenv/python3.6
%env PYSPARK_PYTHON=/home/griddata/analytics/.pyenv/python3.6
(2)SparkContextの作成
from pyspark.sql import SparkSession
# sparksessionの作成
spark = SparkSession.builder.appName("PythonPi3").getOrCreate()
# sparksessionからspark.sparkContextが作成されることを確認する
spark.sparkContext
(3)SparkContextを使用した並列処理の実行
並列処理を実行するには、あらかじめ処理を行いたいデータに対しspark.sparkContext.parallelize()を実行します。 これにより、データはRDDと呼ばれる並列処理用のデータ形式に変換されます。 その後、RDDに対しmapやreduce、collectといった処理内容を記述・実行します。
具体例として、pysparkを使用して円周率を計算する例を以下に記載します。
# pysparkを使用した円周率の計算サンプル
from random import random
from operator import add
n = 100000 * 2
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
#SparkContextを使用した並列処理の実行
count = spark.sparkContext.parallelize(range(1, n + 1), 3).map(f).reduce(add)
# 結果の表示
print("Pi = %f" % (4.0 * count / n))
# sparksessionの終了
spark.stop()
RDDの詳しい操作方法については、 Sparkの公式ページ を参照してください。