The Dabsong Conshirtoe

技術系の話を主にします。

SparkのShuffleについて調べてみる (3:Shuffle Writeの実装探検)

前回の記事では、SparkのShuffleについて、Physical Planから見た内容についてまとめました。

今回は、実行時の観点からのShuffle Writeについて調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)

実行時のShuffleの流れ

Shuffleはどのように実現されているのかを簡単に見ると、以下の流れとなります。

  1. 各TaskがShuffleのキーごとにデータをファイルに書き出す(Shuffle Write)
  2. reducerごとに担当するキーのデータファイルを読み込み、処理を行う(Shuffle Read)

Shuffle Write

もう少し詳細に流れを追うと、以下の流れとなります。

  1. ShuffleMapTaskが起動されます。
  2. RDDのデータを読み込む。この時、前段のステージがShuffleの場合、ShuffleReaderからデータを取得します(Shuffle Read)。そうでない場合、入力ソースからデータを読み込みます。
  3. 得たデータをShuffleWriterを使って出力します。

ShuffleMapTaskの起動について

まず、TaskにはShuffleMapTaskとResultTaskの2種類の実装があります。ResultTaskは、foreachやcollectのような、ジョブの最終ステージを実行するタスクです。一方、ShuffleMapTaskはそれ以外のステージで実行されるタスクです。

DAGSchedulerでTaskを生成する際、対象のステージがShuffleMapStageである場合に生成されているようです。

ShuffleWriterを使った出力について

ShuffleReaderを使った読み込みについては後半で説明するので一旦スキップします。

ShuffleWriterによる出力について

どのようにデータをShuffleするかについて、Sparkではhash, sort, tungsten-sortの3つの実装がされています。どの実装を選択するかは、spark.shuffle.managerにより制御できます。

Hash Shuffle

Spark 1.2.0以前のデフォルト実装です。各mapタスクが各reducer(Shuffleされたデータを受け取る側)ごとにファイルを作成します。ただ、この方式ではMap Taskの数とReducerの数を掛け合わせた量だけファイルが生成されてしまうため、各ファイルを作ったり消したりするところでのオーバーヘッドが大きく、特にパーティション数が大量にある場合に問題なるケースが多かった様です。

spark.shuffle.consolidateFilesという、この問題に対処するオプションも1.6.0以前のバージョンでは存在しましたが、後述のsort shuffleの存在等の理由により削除されています。

Sort Shuffle

spark.shuffle.managerのデフォルト値です。上記のHash Shuffleの問題に対処するために導入されました。各Mapタスクごとにパーティション、キーでソートされたデータを書き出します。同時に、各ファイルごとにどのパーティションがどこから始まるかわかるようにインデックスファイルを別で書き出します。各Reducerは、このインデックスファイルを頼りに必要なパーティションのデータを読む形になります。

具体的に実装を見ていくと、この一連の処理を担うのが、SortShuffleWriterです。SortShuffleWriterは、ExternalSorterを使ってデータの書き込みを行います。

ExternalSorterでは、MapSideCombineが有効な場合、データはPartitionedAppendOnlyMapを使ってキーごとにマージされた形式でオンメモリに保持されます。内部で持つデータサイズが一定量に達した場合、ExternalSorterはspillという処理により、現在のデータをファイルに書き出し、新しいPartitionedAppendOnlyMapを生成します。この時の書き込みバッファはspark.shuffle.file.bufferspark.shuffle.spill.batchSizeにより制御されているようです。

spillにより作成されたファイルは、すべてマージソートされて一つのファイルとして改めて書き出されます。spillされたファイルは書き込み処理完了後に削除されます。

ファイルをすべて書き出したら、IndexShuffleBlockResolverを使ってインデックスファイルを書き出します。

spillの存在によりオンメモリに乗り切らないデータも安全にシャッフルできるのですが、余計なIOやマージソートが発生するため、処理速度になかなかのインパクトを及ぼします。(個人的にも、spillを削減することで処理速度が数倍早くなったケースがありました。)また、ディスクに小さいファイルを書きまくるのでディスクも圧迫します。spillの情報はSparkのUIから閲覧可能ですので、チューニングの際には参考にすると良いでしょう。

ByPass Merge Sort Shuffle

SortShuffleWriterが使われる設定で以下の条件を満たした時、BypassMergeSortShuffleWriterという特殊な実装が使用されます。

旧来のHash Shuffleと同じく、パーティションの数だけファイルを作成し、最後にそれらのファイルのマージと、パーティションの位置を示すためのインデックスファイルを作るだけというシンプルなものです。

SortShuffleWriteと異なり、データをインメモリに保持する必要がなく、spillも発生しないため効率的です。反面、Hash Shuffleと同じく大量のファイルが作成される恐れがあるため、少ないパーティション数の場合のみ有効といえます。

Tungsten Sort Shuffle

Project Tungstenの一環で、オフヒープを使ったより効率的な実装です。spark.shuffle.managertungsten-sortを指定することで有効になる、という記述もいくつか見ますが、実際にコードを見てみるとsortでもtungsten-sortでも同じ実装(SortShuffleManager)が使われていました。(ここは単に私の読み間違いかもしれません)

実装を見る限り、以下の条件を満たす事でUnsafeShuffleWriterが使用されるようです。

  • シリアライズされたオブジェクトを整列した結果が、シリアライズする前に整列した結果と一致すること(ちゃんと理解できでるか微妙です。ドキュメントを見る限り、シリアライズしたオブジェクトのバイトストリームに対して、ソートなどして順序を変更することが可能か、ということでしょうか)
  • Shuffleの処理に集約を含まない
  • パーティションの数が16,777,216より少ない

内部ではShuffleExternalSorterが使われています。Sort Shuffleで出てきたExternalSorterと同じく、内部のレコードはパーティションIDでソートされ、単一のファイルに出力が行われます。

この実装ではserializeされたオブジェクトを直接扱うため、Sort Shuffleで必要とされたserialize/desrializeの処理が不要となります。spillのマージには、FileStreamを使うバージョンとNIOのFileChannelを使うバージョンがあるようです。

また、データはオフヒープに格納され、そのアドレスとパーティションIDがShuffleMemorySorterという専用のSorterに格納されます。アドレスとパーティションIDは単一のLong型に変換されるため、1レコードで使うサイズは8バイト(!)です。

データを書き出す際は、TimSortにより内部の先ほどのデータがソートされます。