The Dabsong Conshirtoe

技術系の話を主にします。

SparkのShuffleについて調べてみる (4:Shuffle Readの実装探検)

前回の記事では、SparkのShuffle Writeの実装を追ってみました。

今回は、Shuffle Readの実装について調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)

Shuffle Read

Shuffle Readは、Shuffle Writeにより書き出されたデータを読み出す処理です。処理するタスクの前段のステージがShuffleステージである場合に行われます。

Shuffle Readは、Sort ShuffleでもHash Shuffleでも同じクラス(BlockStoreShuffleReader)が使用されます。

データのフェッチ

BlockStoreShuffleReaderは、ShuffleBlockFetcherIteratorを使用して対象のデータ(ブロックIDとデータストリーム)を逐次取得します。この時、リモートにあるデータを一度にフェッチするデータの最大はspark.reducer.maxSizeInFlightにより制御されます。この値が大きいほどフェッチの効率はよくなりますが、メモリへのプレッシャーが増します。実際には、最大で5ノードへ同時にリクエストを送るためにこの値を5で割った数を上限としているようですが、どこでその「最大5ノード」という制限をしているかは不明でした。

どのBlockManagerからどのBlockIDを抜けばいいかは、MapOutputTrackerというmapタスクの出力を管理するオブジェクトが教えてくれます。

また、Shuffle Writeの方式によってデータの書き出され方は異なりますが、ここの違いはBlockManagerの中でShuffleBlockResolverにより吸収されています。例えば、Sort Shuffleの場合は前回紹介したIndexShuffleBlockResolverが使用されます。

フェッチされた各データは解凍とdesrializeがされ、キーに応じた集約が行われます。Map Side Combineが行なわれている場合はマージされた結果同士のマージ、そうでない場合は個々の値のマージが行われます。

データのマージ

マージにはExternalAppendOnlyMapというオブジェクトが使用されます。このオブジェクトは値のマージ方法を知っており、データを外部から受け入れてマージされた結果を返します。データが一定の水準に達するとディスクに退避するところが特徴的です。Shuffle Writeで登場したExternalSorterもデータがメモリに収まらない場合はディスクに退避する機能を持っていますが、どちらも同じSpillableというtraitで実現しています。

また、ExternalSorterでもデータの保持にはPartitionedAppendOnlyMapとうオブジェクトを使用しており、先のExternalAppendOnlyMapと同じ親(AppendOnlyMap)を持ちます。AppendOnlyMapではKey-ValueのデータがシンプルなArrayとして記録されるオープンアドレスのハッシュテーブルです。基本的にはキーが入っているインデックスの一つ後ろに値が入っているようです。

キーのポジションは、キーのハッシュ値とQuadratic Probingというキーの衝突回避の手法により決定されているようです。

こうしてマージされたデータがBlockStoreShuffleReaderから返却されます。sortByKeyのようなソート順序の指定がある場合、ExternalSorterによりソートが行われた上で返却されます。

以上です。ExternalAppendOnlyMapでspillしたデータとオンメモリのデータをマージするところなど、まだまだ深く潜り込めるポイントはたくさんありそうですが、ひとまずこの辺で。