STASH LIST

いい感じに蓄えた技術的なことを吐き出すところ。もしくは独り言

Apache Sparkで意図的にCache/Persistをしないと期待した結果にならないこともある

TL;DR

Apache Sparkを使うとき、一つのSpark Applicationが動く前提としている状態(Dataset)はCacheかPersistしておいたほうがいい。

Sparkが内部でどういう動きをしているのかというのをわからずに使っていると、こちらの予想とは違う挙動をして期待した結果を得られないことがある、
ということを実際にハマったケースを使って書いてみます。

ハマったときに感じたのは、Sparkを使うときは、

  1. 遅延実行される
  2. 一つのSpark Application内で複数のActionがあれば、Jobは複数になる
  3. RDD/Datasetは、メモリになければ再計算される(バックワードで)

ということをおさえられている必要があるなぁというところです。

お試しアプリの前提

RDBに、以下のようなUserテーブルがあり1件データが入っているとします。

要は、こうです。

id versionNum name expired
1 1 TEST1 FALSE

このデータを履歴を残しながら、更新していくことを考えます。
例えば、id = 1のレコードのnameをUPDATED NAMEに更新するとすると

id versionNum name expired
1 1 TEST1 TRUE
1 2 UPDATED NAME FALSE

というふうに、レコードを残していくとします。 つまり、更新データが来た場合は、

  1. 前のデータのexpiredをTRUEにUpdateする
  2. 更新データのversionNumをインクリメントして、expiredはFALSEとしてInsertする

こととします。

お試しアプリ

動くコードはGithubに置いてあります。

github.com

Cacheのある/なしによる挙動の違い

contents for blog in 2018 Nov 24th

上のコードは、Githubに実際に置いてあるコードからCacheを抜いたものになります。
これと、Githubに置いてあるCacheが書いてあるコードでは、挙動が変わります。

Cacheがないコードで動かすと、結果は以下のようになってしまい、期待した動作になりません。

id versionNum name expired
1 1 TEST1 TRUE
1 2 UPDATED NAME TRUE

なんでこうなってしまうのか、、、というと、Sparkは

遅延実行される

Sparkのライフサイクル(と言っていいのかちょっとわかりませんが)は、大きく分けると

  • Transform
  • Action

の2つになります。
Transformは、Datasetに対するmapのように、Datasetの構造を変化させる動作を指し、Actionは副作用を伴う動作を指します。
もしJava 8 を触ったことがある方なら、Stream APIを思い出してもらえればわかるかと思います。 あれも、終端動作をしないと実際の処理は走りませんよね?
Sparkも同じです。 Transformを何回実行したとしても、実際にActionが行われなければ処理は走りません。
今回のコードで見てみると、Actionは、

saveTable(User.tableName, persistData.map(_._1))
update(persistData.map(_._2))

の2つがあります。

一つのSpark Application内で複数のActionがあれば、Jobは複数になる

今回のSpark ApplicationはActionが2つあるので、Jobは2つ以上になっています。 (2つじゃないの?という話は別記事にします)
正確ではないですが、イメージとしては、

1つ目のJob

f:id:pe_suke:20181124161307p:plain

2つ目のJob

f:id:pe_suke:20181124161344p:plain

そう、どちらのJobもRDBからデータを取得するところがスタートになっています。

RDD/Datasetは、メモリになければ再計算される(バックワードで)

今回のアプリケーションでは2つ以上Jobがあるので、それぞれ順番に実行されていきます。
なので1つ目のJobによって、データがInsertされたあとに、2つ目のJobが動いてデータのUpdateが行われます。
そして、それぞれのJobはRDBからのデータ取得するところがスタートです。
というわけで、1つ目のJobによって、データがInsertされた後、2つ目のJobがRDBからデータを取得してJoinとUpdateを実行するため、 結果が期待した通りにならなくなってしまいます。
今回のアプリで期待している前提は、どちらのJobもRDBの前提とする状態は前述したレコードが1件入っていることであり、
他のJobによって書き換えられた状態を前提としていないのでおかしなことになってしまいます。

なので、1つ目のJobが動いたときに、RDBから読み出されたときの状態をCacheないしPersistしておくことで、2つ目のJobがRDBを見に行くのではなく、Cacheされたデータを見に行くので、どちらのJobも同じ前提で動かすことができます。

まとめ

Apache Sparkは書いたコードを上から順番に素直に実行されるというわけではないので、内部的な動きはある程度理解してから臨むことをオススメします。