The Dabsong Conshirtoe

技術系の話を主にします。

SparkのShuffleについて調べてみる (1:概要とLogical Plan)

Sparkでは、reduceByKeyやgroupByKeyのような特定の処理を行う場合、シャッフルと呼ばれる処理が実行されます。 これの詳細について自分なりにある程度詳細な理解がしたかったのでまとめてみます。

Sparkのバージョンは1.6.1です。

SparkでのShuffleの概要

SparkにおけるShuffleは、あるRDDに対して操作を行う際にデータの再分散を行うことといえます。

例えば、mapは各RDDに対してmap処理を行うことで事足ります。この時、データの移動は必要ないのでShuffleは発生しません。(図の青い箱がRDD、丸い箱がパーティションです)

f:id:Attsun_1031:20160325001132p:plain

一方、reduceByKeyでは、指定されたキーでの集計が必要となるため、クラスタ上に散らばっている各RDDをキーごとにまとめる必要があり、この時にネットワーク越しのデータ転送、つまりShuffleが行なわれます。

f:id:Attsun_1031:20160325001649p:plain

Shuffleはネットワーク越しのデータ転送が行なわれますので、特に巨大なデータに対してShuffleをする場合は大量のディスクIOやネットワークIOが発生することになるので、パフォーマンスに影響を与えやすいです。

Sparkアプリケーションの実行フロー

Sparkアプリケーションはどのようにして実行されるのでしょう?大まかに分けると、3つのフェーズから成り立ちます。

  1. プログラムから、RDDの依存グラフ(DAG)を作成(Logical Plan)
  2. 依存グラフをステージやタスクといった単位に分解(Physical Plan)
  3. Physical Planに沿って実行

この各フェーズについて、Shuffleがどのように扱われているかを見ていきたいと思います。

Logical Planから見たShuffle

まず、Logical PlanでのShuffleについて見てみましょう。

Logical Planでは、RDDの変換処理からDAGを生成します。例えばmap処理の場合、先ほどの最初の図のようなグラフが作られます。

f:id:Attsun_1031:20160325001132p:plain

一方、Shuffleが伴う処理がおこなれた場合、2つ目の図のようなグラフが作られます。

f:id:Attsun_1031:20160325001649p:plain

この時、SparkではDAGの他に、RDD間にどのような依存関係があるかという情報を保持しています。(RDDクラスのdependenciesメソッドにより取得できます)

依存関係には大きく分けて2種類あります。

NarrowDependency

子のRDDの各パーティションが、親のRDDのどれかのパーティション全てに依存している

ShuffleDependency

子のRDDの各パーティションが、親のRDDの複数のパーティションの一部に依存している

名前から見てわかる通り、Shuffleを伴う操作がある場合、その親との依存関係はShuffleDependencyとなります。

「一部に依存している」とは、例えばreduceByKeyのようにキーごとに集計する場合は同じキーのデータが同じパーティションに入るようにする必要があるので、親の各パーティションからそのキーのデータのみ(つまり一部)を抜き出して新しいパーティションに引っ張ってくる、ということを意味します。

以下のリンクの右下の図が、ShuffleDependencyです。図にするとわかりやすいですね。

https://github.com/JerryLead/SparkInternals/blob/master/PNGfigures/Dependency.png

ShuffleDependencyは、Shuffleされたデータをどのように集約するかやmap side combine(データ転送の前にローカルのデータのみで集計を済ませておくことでネットワーク転送量を減らす仕掛け)を行うか、といった情報を保持しています。

依存関係が構築される例

例えば、map処理は(NarrowDependencyの子クラスである)OneToOneDepenencyを持ちます。

scala> sc.parallelize(Array(1,2,3)).map(_ + 1).dependencies
res19: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@6a11e139)

一方、reduceByKeyにより生成されるRDDはShuffleDependencyを持ちます。

scala> sc.parallelize(Array(("x", 1), ("y", 2), ("x", 3))).reduceByKey(_ + _).dependencies
res17: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1a47ddfb)

mapのように必ずどの依存関係を持つか決まっているものもありますが、例えばjoinはpartitionerが同じならNarrowDependencyになりますし、そうでないならShuffleDependencyになります。これは、もし2つのRDDが同じようにパーティショニングされているならデータを再分散する必要がないからです。