読者です 読者をやめる 読者になる 読者になる

The Dabsong Conshirtoe

技術系の話を主にします。

Spark1.6 - Datasetの導入とメモリ管理の変更

spark1.6がリリースされました。

Spark 1.6.0 released | Apache Spark

詳しい変更点は上記を見ていただくとして、ここではDatasetとメモリ管理方式の変更について紹介します。

メモリ管理の変更

Spark1.6ではメモリ(ヒープ)の管理が柔軟になり、より効率的にメモリを使用できるようになりました。

1.5までの世界

Sparkではメモリの使用領域が主に2つの領域に分かれています。

ストレージ領域

RDDをメモリにキャッシュする際に使われる領域です。

実行領域

シャッフルなど、タスクの実行に使われる領域です。

従来、この2つの領域は厳密に分断されていたため、チューニングを誤ると全体としてはメモリが余っているにもかかわらず、無駄なディスク書き込みが発生するなどしてしまいました。

1.6からの世界

1.6ではこの管理方式に変更が加えられ、お互いがお互いの領域を使用することが可能になりました。

また、ストレージ領域がspark.memory.storageFractionを超えてメモリを使用している場合、実行領域はストレージ領域の使用メモリがspark.memory.storageFractionを上回っている限り横取りすることができるそうです。(逆は不可能)

重要な設定値

spark.memory.fraction

JVMのうち、Sparkが使用できるメモリの割合を示す。

spark.memory.storageFraction

spark.memory.fractionのうち、ストレージ領域に最低どれだけ確保できるかを示す。

spark.memory.useLegacyMode

trueに設定することで、1.5以前の古いメモリ管理方法を採用できる。

Dataset API

1.6で初めて追加された試験的な機能です。pythonは未サポート。

Datasetを使うことで、素のRDDがもたらす静的な型チェックと、DataFrameがもたらす実行の最適化の両方の恩恵が受けられます。

型付けについて詳しくみてみましょう。

静的な型チェック

静的な型チェックが得られるとはどういうことでしょうか?このメリットを理解するには、まずDataFrameの問題点を理解する必要があります。

DataFrameの例

例えば、名前(String)と年齢(Long)の2つのフィールドを持つPersonスキーマを外部から読み込むとしましょう。(このjsonは、sparkのexapmleディレクトリに含まれているものです)

scala> case class Person(name: String, age: Long)
scala> val path = "examples/src/main/resources/people.json"
scala> val people: DataFrame = sqlContext.read.json(path)

次は、このpeopleからage列のみを取り出すメソッドを定義したいとします。

scala> def collectAge(d: DataFrame): DataFrame = d.select($"age")

一見問題ないように見えますが、実はここに危険があります。

まず、戻ってくる型がDataFrameという非常に抽象的な型であるため、実装をよく見ないと実際どのようなスキーマDataFrameが戻ってきているかわからず、当然静的な型チェックも効きません。

例えば以下のような定義はすべてコンパイルを通過します。

// ageではなくnameを返してしまっているが、コンパイルできる
scala> def collectAge(d: DataFrame): DataFrame = d.select($"name")
 
// 存在しないフィールドを得るメソッドもコンパイルは通過する
scala> def collectAge(d: DataFrame): DataFrame = d.select($"xxx")
// 実行時にエラーとなる。
scala> collectAge(people)
org.apache.spark.sql.AnalysisException: cannot resolve 'xxx' given input columns age, name;

DataFrameに対する操作が複雑になればなるほど、今扱っているDataFrameが一体どのようなものなのか、正しくスキーマを守れているのか、といったことがわかなくなります。

Datasetの例

Datasetはこれを解決します。

まずはDatasetを作ってみましょう。DataFrame.asによりDatasetに変換できます。

scala> val people: Dataset[Person] = sqlContext.read.json(path).as[Person]

次に、DataFrameの例のときと同様にageを取り出す関数の定義です。

scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.age)

見てお分かりだと思いますが、戻り値のDatasetがどのような型を持っているか明示されています。

これにより、DataFrameの時に問題になった間違ったメソッド定義はすべてコンパイル時にチェックされます。

// 戻り値の型と一致しないのでエラー
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name)
<console>:32: error: type mismatch;
 found   : String
 required: Long
         def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name)
 
// Personに定義されていないフィールドを参照しようとしているのでエラー
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)
<console>:32: error: value xxx is not a member of Person
         def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)

これにより、SparkSQLの最適化の恩恵を受けながら、静的な方チェックの恩恵を受けることにより、安全にプログラムを書くことが可能になりました。めでたしめでたし。