こんにちは、Customer Analytics Divisionの石川航作と申します。Customer Analytics Divisionはお客様が展開するサービスの改善を目的としたコンサルティング業務を行っております。このサービスは1,000万人以上の会員様にご利用頂いており、一般的なデータ分析でお馴染みのpandasやscikit-learnを用いて分析することは時間的制約から困難です。そこで、弊社では分散処理システムであるSparkとそのpython APIであるPySparkを用いて諸々の分析を行っております。
本記事では、PySparkを用いて線形回帰モデルを実装する方法をご紹介します。特に、PySparkならではの処理、線形回帰モデルを作成する上で必要な前処理、モデル作成後の評価についても詳しい説明を記載するように心がけました。
なお、本記事は筆者が個人的に書いたQiitaの記事をTech Blogのために加筆・修正したものです。
データの読み込み
本稿では、Boston Housingをサンプルデータにして説明していきます。下記のようにして、scikit-learnからBoston Housingのデータセットを読み込み、pyspark.DataFrameに変換しておきます。
なお、scikit-learnにおいてはBotson Housingはv1.0から非推奨となっており、v1.2で利用不可となるそうです。利用の際は注意していただければと思います。(参考URL:sklearn.datasets.load_boston )
# pyspark関連のライブラリ
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Boston Housingデータセットデータセットの読み込み
from sklearn.datasets import load_boston
dataset = load_boston()
# Boston Housingデータセットをpandas.DataFrameに変換
import pandas as pd
# 説明変数取得
data_x = pd.DataFrame(dataset.data, columns=dataset.feature_names)
# 目的変数を取得
data_y = pd.DataFrame(dataset.target, columns=['target'])
# 説明変数と目的変数を結合して、1つのDataFrameにする
data = pd.merge(data_y, data_x, how="left", left_index=True, right_index=True)
# pysparkDataFrameに変換
df = spark.createDataFrame(data)
データの前処理
本節では、以下の3つの順にデータの前処理についてご説明します。
- 基本統計量(各カラムの行数・平均・分散・最小値・最大値・欠損値の個数)出力の実装方法
- 分析結果の解釈性を高めるための標準化の実装方法
- 多重共線性の評価に用いるVIF出力の実装方法
基本統計量出力の実装方法
モデルを作る前に基本統計量を確認しておくことは、データの状態を確認しておくという点で非常に重要です。ここでは、pyspark.DataFrameに含まれる各カラムの行数・平均・分散・最小値・最大値・欠損値の個数を計算するスクリプトをご紹介します。
まずは、行数の確認です。
# 行数の確認
print("行数:", df.count())
次に、平均・分散・最小値・最大値を計算します。pyspark.ml.stat.Summarizerを使います。
from pyspark.ml.stat import Summarizer from pyspark.ml.feature import import VectorAssembler from pyspark.sql.functions import col # 基本統計量を指定 summarizer = Summarizer.metrics("mean", "variance", "min", "max") # featuresを作成 vector_assembler = VectorAssembler( inputCols = df.columns, # 各基本統計量を計算したいカラムを指定 outputCol = "features" ) df_features = ( vector_assembler .transform(df) ) # 基本統計量の計算 df_statistics = df_features.select(summarizer.summary(col("features")))
出力したdf_statisticsに、各カラムの基本統計量が保存されます。カラム数が多い場合はjupyterで全て表示されないので、csvファイル等に出力して観察すると良いです。また、スクリプト中に「# featuresを作成」という箇所がありますが、これについては後述の「線形回帰モデル作成」のパートでご説明します。
最後に欠損値の個数です。
# 各列の欠損値の個数の確認
from pyspark.sql.functions import col
null_count_list = [df.where(col(c).isNull()).count() for c in df.columns] #各列の欠損値の個数を計算
null_count_df = pd.DataFrame(columns = df.columns) #欠損値の個数を格納するdataFrameを作成
null_count_df.loc["null_count"] = null_count_list
標準化の実装方法
データを以下の式に基づいて、平均0、分散1となるように変換することを標準化と呼びます。なお、今後特別に断らない限り、Nはサンプルサイズを表します。線形回帰モデルにおいて、インプットとなるデータを標準化することで推定された回帰係数の大きさを比較することが可能になります。
PySparkにはpyspark.ml.feature.StandardScalerというメソッドが既に実装されており、これを用いることでデータの標準化を実行することが可能です。しかし、各カラムの名前をコントロールでき、かつ変換を実行するスクリプトがわかりやすくなるように、今回は各カラムを一つずつ標準化する方法をご紹介します。
# IDを振っておく(便利のため)
# import
from pyspark.sql.functions import monotonically_increasing_id
#IDを連番で振る
df_id = df.withColumn('ID', monotonically_increasing_id())
# 標準化
# import
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 目的変数、説明変数の抽出
var = df_id.columns
var.remove("ID") #ID列を排除
# 標準化
z_df = df_id
for c in var:
tmp1_z_df = (
df_id
.withColumn('mean', F.mean(c).over(Window.orderBy( )))
.withColumn('std', F.stddev(c).over(Window. orderBy()))
.withColumn(c+'_z_scaled', (F.col(c) - F.col('mean')) / F.col('std')) # 実際に標準化している箇所はココ!
)
tmp2_z_df = tmp1_z_df.select("ID", c+"_z_scaled")
z_df = z_df.join(tmp2_z_df, ["ID"], "left")
VIF出力の実装方法
線形回帰モデルには多重共線性という問題があります。多重共線性とは、相関が高い2つ以上の特徴量を線形回帰モデルにインプットしたときに、推定されたパラメータが不安定になってしまうという問題です。ここでは、VIF(Variance Inflaction Factor)という指標を用いて多重共線性が発生しそうな変数を排除します。
VIFとは、各説明変数が他の説明変数でどれくらい説明できるかを表す指標です。VIFは2つのステップを経て計算されます。
まず、説明変数X_kに対して、X_kを目的変数、X_1, …, X_k-1, X_k+1, X_pを説明変数とした線形回帰モデルを作成します。
次に、上記の線形回帰モデルから得られた決定係数R_kを用いてVIFを計算します。
このようにして得られたVIFについて、一般的に分析者が設定する閾値を超える説明変数があったときに、その変数を説明変数から除外することで多重共線性を回避します。今回は閾値としてよく使われる10を超えた変数は存在しなかったため、変数の除外は行いませんでした。VIFについての詳細はKutner, M., et al(2005)を参照してください。
VIFをPySparkで計算するスクリプトは下記です。なお、スクリプト中に「# featuresの作成」とありますが、これについては後述します。
# import
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# VIFを格納するリスト
VIF_list = []
# 説明変数の取得
tmp_ex_var = z_df.columns
ex_var = [ev for ev in tmp_ex_var if "_z_scaled" in ev] # 標準化した変数のみ取得
ex_var.remove("target_z_ scaled") # 目的変数を排除
# VIFの計算
for c in ex_var:
# featuresの作成
##VIF計算時に説明変数となる変数の取得
VIF_ex_var = [c2 for c2 in ex_var if c2 != c]
## VectorAssemblerインスタンス作成
vector_assembler = VectorAssembler(
inputCols = VIF_ex_var,
outputCol='features'
)
## 線形回帰モデルのインプットとなるpyspark. DataFrameの作成
VIF_z_df = (
vector_assembler
.transform(z_df)
.withColumnRenamed(c, "label")
)
# 線形回帰
lr = LinearRegression(regParam = 0.0, fitIntercept = False)
lrModel = lr.fit(VIF_z_df)
trainig_summary = lr.summary
## VIFの計算
VIF_list.append(1/(1-trainig_ summary.r2))
## 今何を計算しているか可視化
print("目的変数:", c, "\n")
print("説明変数:", VIF_ex_var, "\n")
# pandasDataFrameに格納してVIFを見やすくする
VIF_df = pd.DataFrame(index = ex_var)
VIF_df["VIF"] = VIF_list
ここでは、VIFが10を超える変数が存在しなかったので、説明変数を取り除くことはせずに次に進みます。
線形回帰モデル作成
前処理が済んだらいよいよ線形回帰モデルを作成します。ここでは、pyspark.LinearRegressionを使います。ただし、LinearRegressionを使う際には、pyspark.DataFrameを変形する必要があります。下図を見てください。左側が特に何も処理をしていないpyspark.DataFrameで、右側がLinearRegressionのインプットとなるDataFrameです。右側のDataFrameには説明変数をリスト化した値を持つfeaturesというカラムが追加されています。
実は、pyspark.LinearRegressionは説明変数の入力をリストで受け付けるので、データセットもそれに合わせて変換する必要があります。上記スクリプト内で「# featuresの作成」という処理がありましたが、そこでは説明変数を一つのカラムにまとめる作業を行っています。なお、説明変数をリスト化した値を持つカラムの名前は自由につけられますが、LinearRegressionのデフォルトがfeaturesを説明変数として線形回帰モデルを作成するメソッドであるため、今回はfeaturesというカラム名にしています。
featuresを作成するためのスクリプトは下記です。VectorAssemblerを使います。
# import
from pyspark.ml.feature import VectorAssembler
# featuresの作成
vector_assembler = VectorAssembler(
inputCols = ex_var, # featuresに入れたいカラムをリストで指定
outputCol = 'features'
)
feature_z_df = (
vector_assembler
.transform(z_df) # featuresが含まれたpyspark. DataFrameの作成
.withColumnRenamed(target_var, "label") # LinearRegressionで指定する目的変数のデフォル トがlabelなのでlabelにカラム名を変えた
)
ここまで来たら、後はfeature_z_dfをLinearRegressionメソッドの引数とすることで、線形回帰モデルを作成することができます。スクリプトは下記です。
# 線形回帰モデルの作成
lr = LinearRegression(regParam = 0.0, fitIntercept = True) # 定数項がある場合
# lr = LinearRegression(regParam = 0.0, fitIntercept = False) # 定数項がない場合
lrModel = lr_intercept.fit(feature_z_df)
また、作成したモデルに対して予測値を計算するスクリプトは下記です。
# 予測値の計算
predict_summary = lrModel.evaluate(feature_z_df)
モデル評価
線形回帰モデルの作成まで終わりました。本節では、作成したモデルの評価を行う方法についてご説明します。以下の2つの手順でモデル評価をしていきます。
1.作成したモデルが線形回帰モデルの仮定を満たしているか確認
・残差の分散は均一か確認
・残差が正規分布にしたがっているか確認
2.モデル選択基準の計算
・AIC
・汎化性能
線形回帰モデルの仮定を満たしているか?
残差の分散は均一か?
ここでは、残差の分散が均一であるかどうかを確認します。ただし、PySparkよりも散布図のプロットを簡潔に記載できるpandas.Dataframeにより可視化します。
# Pandasに変換
sampled_predict_residuals_pd_ df = (
predict_residuals_df
.select("prediction", "residuals")
.toPandas() #Pandasに変換
)
# 散布図(予測値×残差)作成
import matplotlib.pyplot as plt
plt.scatter(sampled_predict_ residuals_pd_df["prediction"], sampled_predict_residuals_pd_ df["residuals"])
plt.axhline(y = 0, xmin = min(sampled_predict_residuals_ pd_df["prediction"]), xmax = max(sampled_predict_residuals_ pd_df["prediction"]), color = "black", linestyle = "dotted")
上記でpandas.DataFrameに変換していますが、データが巨大な場合、pandas.DataFrameへの変換が困難なこともあると思います。その際は、pyspark.DataFrameに含まれるすべてのデータを確認するのではなく、ランダムに抽出した一部のデータを確認することをお勧めします。下記のスクリプトでデータをランダムに抽出することができます。
# ランダムサンプリングする割合
rate = 0.1
# ランダムサンプリング & Pandasに変換
sampled_predict_residuals_pd_ df = (
predict_residuals_df
.select("prediction", "residuals")
.sample(rate) # ランダムサンプリングしている箇所はココ!
.toPandas() #Pandasに変換
)
描いた散布図は下記です。
残差が正規分布にしたがっているか?
ここでは、QQプロットを作成して確認します。
# QQプロット作成
import scipy.stats as stats
stats.probplot(sampled_predict_residuals_pd_df["residuals"], dist = "norm", plot = plt)
plt.show()
作成した図は下記です。
モデルの選択基準
ここでは、モデルの選択基準であるAIC(Akaike’s Information Criterion)と汎化性能の測定をします。なお、今回は汎化性能はRMSE(Root Mean Squared Error)を指標としてホールドアウト法で実装しています。また、本来は複数のモデルを作成し、比較することでAICや汎化性能などの指標は意味を持ちます。しかし、本記事の趣旨は実装方法を説明することなので、モデルの比較は行いません。
AIC
線形回帰モデルにおけるAICを計算するスクリプトは下記です。誤差項が平均0、分散一定の正規分布にしたがっていることを仮定しています。ちなみに今回のモデルは、AIC=780.3でした。
# サンプルサイズ
N= feature_z_df.count()
# パラメータの個数
k = len(ex_var)+1 # 定数項がある場合
# k = len(ex_var) # 定数項がない場合
# 残差の2乗和の計算
var_hat = (
predict_residuals_df
.withColumn("residulas_2", col("residuals")**2)
.agg(
F.sum("residulas_2").alias(" sum_residulas_2")
)
.collect()[0]
.sum_residulas_2
)/(N-k)
# AICの計算
from math import log, sqrt, pi
AIC = -2*N*log(1/sqrt(2*pi*var_hat)) +N+k
print("AIC = ", AIC)
汎化性能
汎化性能を計算するためのスクリプトは下記です。今回のモデルにおいてはRMSE=0.45でした。
# 訓練データとテストデータに分割
feature_z_df_train, feature_z_df_test = feature_z_df.randomSplit([0.8, 0.2], seed=2021)
# 訓練データについて定数項ありで線形回帰モデルの作成
lr = LinearRegression(regParam = 0.0, fitIntercept = True)
lrModel_train = lr_intercept.fit(feature_z_df_ train)
# テストデータについて予測値の計算
predict_summary_test = lrModel_train.evaluate( feature_z_df_test)
predict_df_test = predict_summary_test. predictions
# RMSEの計算
## 予測値と実現値を一つのdataFrameにまとめる
feature_z_df_test_id = feature_z_df_test.select("ID", "label")
predict_df_test_id = predict_df_test.select("ID", "prediction")
# 実現値・予測値の両方が入ったDataFrameの結合
predict_act_df_test = (
feature_z_df_test_id
.join(predict_df_test_id,
["ID"],
"left"
)
.select("label", "prediction")
)
# RMSEの計算
from pyspark.mllib.evaluation import RegressionMetrics
metrics = RegressionMetrics(predict_act_ df_test.rdd)
print("RMSE = %s" % metrics.rootMeanSquaredError)
終わりに
本記事では、線形回帰モデルをPySparkで実装する方法についてご説明しました。ちなみに、私が調査した限りでは、日本語でここまで網羅的にまとめた記事はありませんでした。
冒頭でも述べた通り、ARISE analyticsではサンプルサイズが1,000万を超えるようなデータを分析する機会が多いです。線形回帰モデルをはじめとした基本的なモデルにおいても、大規模データを分析できるような並列処理に適したライブラリが存在しなかったり、日本語でまとめられた記事が存在しなかったりと苦労することが多いです。しかし、このような大きいデータから知見を抽出することは、データサイエンティストにとって非常に魅力的な作業の一つです。この作業の助けになればと思い、本記事を執筆しました。
本記事を通じてPySparkに対する知見が深まり、また、ARISE analyticsに興味を持ってくださる方がいらっしゃれば筆者として喜ばしい限りです。最後までお読みいただきありがとうございました。
参考文献
Kutner, M., Nachtsheim, C., Neter, J., & Li, W. (2005) Applied Linear Statistical Models. McGraw-Hill Irwin, New York