Spark1.6 - Datasetの導入とメモリ管理の変更
spark1.6がリリースされました。
Spark 1.6.0 released | Apache Spark
詳しい変更点は上記を見ていただくとして、ここではDatasetとメモリ管理方式の変更について紹介します。
メモリ管理の変更
Spark1.6ではメモリ(ヒープ)の管理が柔軟になり、より効率的にメモリを使用できるようになりました。
1.5までの世界
Sparkではメモリの使用領域が主に2つの領域に分かれています。
ストレージ領域
RDDをメモリにキャッシュする際に使われる領域です。
実行領域
シャッフルなど、タスクの実行に使われる領域です。
従来、この2つの領域は厳密に分断されていたため、チューニングを誤ると全体としてはメモリが余っているにもかかわらず、無駄なディスク書き込みが発生するなどしてしまいました。
1.6からの世界
1.6ではこの管理方式に変更が加えられ、お互いがお互いの領域を使用することが可能になりました。
また、ストレージ領域がspark.memory.storageFraction
を超えてメモリを使用している場合、実行領域はストレージ領域の使用メモリがspark.memory.storageFraction
を上回っている限り横取りすることができるそうです。(逆は不可能)
重要な設定値
spark.memory.fraction
JVMのうち、Sparkが使用できるメモリの割合を示す。
spark.memory.storageFraction
spark.memory.fraction
のうち、ストレージ領域に最低どれだけ確保できるかを示す。
spark.memory.useLegacyMode
trueに設定することで、1.5以前の古いメモリ管理方法を採用できる。
Dataset API
1.6で初めて追加された試験的な機能です。pythonは未サポート。
Dataset
を使うことで、素のRDD
がもたらす静的な型チェックと、DataFrame
がもたらす実行の最適化の両方の恩恵が受けられます。
型付けについて詳しくみてみましょう。
静的な型チェック
静的な型チェックが得られるとはどういうことでしょうか?このメリットを理解するには、まずDataFrame
の問題点を理解する必要があります。
DataFrameの例
例えば、名前(String)と年齢(Long)の2つのフィールドを持つPerson
スキーマを外部から読み込むとしましょう。(このjsonは、sparkのexapmleディレクトリに含まれているものです)
scala> case class Person(name: String, age: Long) scala> val path = "examples/src/main/resources/people.json" scala> val people: DataFrame = sqlContext.read.json(path)
次は、このpeople
からage列のみを取り出すメソッドを定義したいとします。
scala> def collectAge(d: DataFrame): DataFrame = d.select($"age")
一見問題ないように見えますが、実はここに危険があります。
まず、戻ってくる型がDataFrame
という非常に抽象的な型であるため、実装をよく見ないと実際どのようなスキーマのDataFrame
が戻ってきているかわからず、当然静的な型チェックも効きません。
例えば以下のような定義はすべてコンパイルを通過します。
// ageではなくnameを返してしまっているが、コンパイルできる scala> def collectAge(d: DataFrame): DataFrame = d.select($"name") // 存在しないフィールドを得るメソッドもコンパイルは通過する scala> def collectAge(d: DataFrame): DataFrame = d.select($"xxx") // 実行時にエラーとなる。 scala> collectAge(people) org.apache.spark.sql.AnalysisException: cannot resolve 'xxx' given input columns age, name;
DataFrame
に対する操作が複雑になればなるほど、今扱っているDataFrame
が一体どのようなものなのか、正しくスキーマを守れているのか、といったことがわかなくなります。
Datasetの例
Dataset
はこれを解決します。
まずはDataset
を作ってみましょう。DataFrame.as
によりDataset
に変換できます。
scala> val people: Dataset[Person] = sqlContext.read.json(path).as[Person]
次に、DataFrame
の例のときと同様にageを取り出す関数の定義です。
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.age)
見てお分かりだと思いますが、戻り値のDataset
がどのような型を持っているか明示されています。
これにより、DataFrame
の時に問題になった間違ったメソッド定義はすべてコンパイル時にチェックされます。
// 戻り値の型と一致しないのでエラー scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name) <console>:32: error: type mismatch; found : String required: Long def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name) // Personに定義されていないフィールドを参照しようとしているのでエラー scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx) <console>:32: error: value xxx is not a member of Person def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)
これにより、SparkSQLの最適化の恩恵を受けながら、静的な方チェックの恩恵を受けることにより、安全にプログラムを書くことが可能になりました。めでたしめでたし。