SparkのShuffleについて調べてみる (4:Shuffle Readの実装探検)
前回の記事では、SparkのShuffle Writeの実装を追ってみました。
今回は、Shuffle Readの実装について調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)
Shuffle Read
Shuffle Readは、Shuffle Writeにより書き出されたデータを読み出す処理です。処理するタスクの前段のステージがShuffleステージである場合に行われます。
Shuffle Readは、Sort ShuffleでもHash Shuffleでも同じクラス(BlockStoreShuffleReader
)が使用されます。
データのフェッチ
BlockStoreShuffleReader
は、ShuffleBlockFetcherIterator
を使用して対象のデータ(ブロックIDとデータストリーム)を逐次取得します。この時、リモートにあるデータを一度にフェッチするデータの最大はspark.reducer.maxSizeInFlight
により制御されます。この値が大きいほどフェッチの効率はよくなりますが、メモリへのプレッシャーが増します。実際には、最大で5ノードへ同時にリクエストを送るためにこの値を5で割った数を上限としているようですが、どこでその「最大5ノード」という制限をしているかは不明でした。
どのBlockManager
からどのBlockIDを抜けばいいかは、MapOutputTracker
というmapタスクの出力を管理するオブジェクトが教えてくれます。
また、Shuffle Writeの方式によってデータの書き出され方は異なりますが、ここの違いはBlockManager
の中でShuffleBlockResolver
により吸収されています。例えば、Sort Shuffleの場合は前回紹介したIndexShuffleBlockResolver
が使用されます。
フェッチされた各データは解凍とdesrializeがされ、キーに応じた集約が行われます。Map Side Combineが行なわれている場合はマージされた結果同士のマージ、そうでない場合は個々の値のマージが行われます。
データのマージ
マージにはExternalAppendOnlyMap
というオブジェクトが使用されます。このオブジェクトは値のマージ方法を知っており、データを外部から受け入れてマージされた結果を返します。データが一定の水準に達するとディスクに退避するところが特徴的です。Shuffle Writeで登場したExternalSorter
もデータがメモリに収まらない場合はディスクに退避する機能を持っていますが、どちらも同じSpillable
というtraitで実現しています。
また、ExternalSorter
でもデータの保持にはPartitionedAppendOnlyMap
とうオブジェクトを使用しており、先のExternalAppendOnlyMap
と同じ親(AppendOnlyMap
)を持ちます。AppendOnlyMap
ではKey-ValueのデータがシンプルなArray
として記録されるオープンアドレスのハッシュテーブルです。基本的にはキーが入っているインデックスの一つ後ろに値が入っているようです。
キーのポジションは、キーのハッシュ値とQuadratic Probingというキーの衝突回避の手法により決定されているようです。
こうしてマージされたデータがBlockStoreShuffleReader
から返却されます。sortByKey
のようなソート順序の指定がある場合、ExternalSorter
によりソートが行われた上で返却されます。
以上です。ExternalAppendOnlyMap
でspillしたデータとオンメモリのデータをマージするところなど、まだまだ深く潜り込めるポイントはたくさんありそうですが、ひとまずこの辺で。