Scala×SparkのUDFとウィンドウ関数を使って緯度経度データから2点間の距離を求める

ARISE Tech Blog

こんにちは、Marketing Solution Divisionの鴨居です。私は位置情報を扱う分析ソリューションの開発チームに1年ほど在籍したのちに現在のチームに移籍しました。前チームではScala×Sparkで位置情報データの分析・ロジック開発を行っていました。この記事では、Scala×Sparkを使った位置情報データ分析の例をご紹介したいと思います。

背景

これまではマーケティング分析において位置情報データが活用されることはほとんどありませんでしたが、最近では位置情報データを使ったマーケティング活動が注目されてきており、コロナ禍を経てさらに脚光を浴びています。参考までに、KDDIとARISE analyticsが共同で出しているコロナ分析レポートのリンクを貼っておきます。(こちら:主要観光地におけるGW期間中の詳細人流分析レポート

位置情報データ活用の波を受けて、私のチームでも現在位置情報を活用したマーケティング分析を検討しております。そこで、個人としては1年ぶりになるScala言語を使った位置情報関連の実装をご紹介したいと思います。

今回はスマートフォン端末で計測されるGPS位置情報データから2点間の距離を求める処理をScala sparkで実装してみました。Sparkに馴染みのない方に向けて、Sparkでよく使われるウィンドウ関数というものを意図的に使った実装を紹介していきたいと思います。この記事で紹介するソースコードはすべてScalaで書かれていますが、Pyspark(Python×Spark)で書きたい方もほとんど同じ実装で書くことができるため参考にしてみてください。

ウィンドウ関数を使って1レコード前のデータを読み込む

Sparkではウィンドウ関数(window functions)という機能を使うことができます。これはSQLなどにも実装されている機能で、対象レコード以外のレコードを参照することができるものです。今回は、以下のような時間情報を持った位置情報データがあるときに、対象レコードの前時刻の緯度経度と対象レコードの緯度経度の距離を求めます。

 

|scala|
locData.show()
||<
| timestamp      | lat       | lon        |
| 2018/2/1 10:00 | 35.680923 | 139.766051 |
| 2018/2/1 10:05 | 35.681236 | 139.764362 |
| 2018/2/1 10:07 | 35.680232 | 139.762944 |
| 2018/2/1 10:08 | 35.678161 | 139.762836 |
| 2018/2/1 10:08 | 35.680496 | 139.762686 |
| 2018/2/1 10:10 | 35.681301 | 139.762971 |

 

locDataには各時刻の緯度経度データしかないため、ここに前時刻の緯度経度のカラムを追加することで、レコードごとに前時刻との距離を求めることができます。

まずウィンドウ関数を次のように定義します。

 

|scala|
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val w = Window.orderBy(“timestamp”)
||<

 

今回は1人のユーザ分のデータを使うためtimestamp順にデータを並べるだけですが、複数ユーザがいる場合は以下のようにパーティションを切ってウィンドウを定義します。

 

|scala|
val w = Window.partitionBy(“userId”).orderBy(“timestamp”)
||<

 

上記で定義したウィンドウ関数を使って、前レコードの緯度経度カラムを次のように追加します。

 

|scala|
val locDataWithPrev = locData.withColumn(“prev_lat”, lag($”lat”, 1).over(w))
.withColumn(“prev_lon”, lag($”lon”, 1).over(w))
locDataWithPrev.show()
||<
| timestamp      | lat       | lon        | prev_lat  | prev_lon   |
| 2018/2/1 10:00 | 35.680923 | 139.766051 | null      | null       |
| 2018/2/1 10:05 | 35.681236 | 139.764362 | 35.680923 | 139.766051 |
| 2018/2/1 10:07 | 35.680232 | 139.762944 | 35.681236 | 139.764362 |
| 2018/2/1 10:08 | 35.678161 | 139.762836 | 35.680232 | 139.762944 |
| 2018/2/1 10:08 | 35.680496 | 139.762686 | 35.678161 | 139.762836 |
| 2018/2/1 10:10 | 35.681301 | 139.762971 | 35.680496 | 139.762686 |

 

これでデータの下準備は終わりです。このデータにUDFを適用することで2点間の距離を求めていきます

 

Spark UDFで2点間の距離を求める

SparkにはScala spark, PysparkともにUDF(User Defined Functions)という概念があります。Spark UDFとは、Spark上でユーザが自由に定義することができる関数を表しています。ウィンドウ関数のlagはあらかじめ定義されていた関数であるのに対して、UDFでは”lat”, “lon”, “prev_lat”, “prev_lon”の4カラムを入力として2点間の距離を出力するような複雑な関数を実装することができます。

まずは、2点の緯度経度を入力として距離を求めるScalaメソッドを次のように定義します。ここでは、haversine formulaという式を使って2点間の距離を求めます。(参考: https://en.wikipedia.org/wiki/Haversine_formula )

今回は分かりやすくするために、簡易な実装で書いています。

 

|scala|
def getDistance(lat1: Double, lon1: Double, lat2: Double, lon2: Double): Double = {
val r: Long = 6371
val lat_dist = scala.math.toRadians(lat1) – scala.math.toRadians(lat2)
val lon_dist = scala.math.toRadians(lon1) – scala.math.toRadians(lon2)
val h = scala.math.pow(scala.math.sin(lat_dist/2), 2) + scala.math.cos(lat1) * scala.math.cos(lat2) * scala.math.pow(scala.math.sin(lon_dist/2), 2)
scala.math.asin(scala.math.sqrt(h)) * 2 * r * 1000 // メートル
}
||<

 

次に、このメソッドをUDFとして定義します。このステップを行うことで、定義した関数を自由に呼び出すことができます。

 

|scala|
import org.apache.spark.sql.functions.udf
val getDistUdf = udf(getDistance _)
||<

 

最後に各レコードに対して前時刻のレコードとの距離を求めた結果のカラムを追加することで、求めたかった2点間の距離を求めることができます。

 

|scala|
val locDataWithDist = locDataWithPrev.withColumn(”dist”, getDistUdf($”lat”, $”lon”, $”prev_lat”, $”prev_lon”))
locDataWithDist.show()
||<
| timestamp      | lat       | lon        | prev_lat  | prev_lon   | dist    |
| 2018/2/1 10:00 | 35.680923 | 139.766051 | null      | null       | null    |
| 2018/2/1 10:05 | 35.681236 | 139.764362 | 35.680923 | 139.766051 | 156.472 |
| 2018/2/1 10:07 | 35.680232 | 139.762944 | 35.681236 | 139.764362 | 169.902 |
| 2018/2/1 10:08 | 35.678161 | 139.762836 | 35.680232 | 139.762944 | 230.471 |
| 2018/2/1 10:08 | 35.680496 | 139.762686 | 35.678161 | 139.762836 | 259.993 |
| 2018/2/1 10:10 | 35.681301 | 139.762971 | 35.680496 | 139.762686 | 93.14   |

 

まとめ

Scala SparkのUDFとウィンドウ関数を使って緯度経度データから2点間の距離を求めるコードを実装しました。今回は位置情報データを扱う際の実装をイメージしていただくために簡単なお題をご紹介しました。実際に位置情報データを扱う際には大量のデータを扱うことになるため、パフォーマンスを考慮した実装を意識する必要があるというところに難しさと同時に面白さがあります。今回は位置情報データを分析するまでの前準備が主なトピックとなってしまいましたが、位置情報データをこういう風に扱ったほうがいいといったアイデアがあればぜひご意見を頂ければと思います。

※本記事で使われている位置情報データは全てサンプルデータです。