読者です 読者をやめる 読者になる 読者になる

The Dabsong Conshirtoe

技術系の話を主にします。

SparkのShuffleについて調べてみる (2:Physical Plan)

前回の記事では、SparkのShuffleについての概要と、Logical Planから見たShuffleについてまとめました。

今回は、Physical Planから見たShuffleについて調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)

Physical Planについて

Logical Planで作成したDAGを、実際にどのような粒度のタスクに分解し、どのように実行するのかを定めたものがPhysical Planです。

DAGは、いくつかのStageに分解され、さらにそのStageではRDDパーティションごとにTaskという単位で処理が実行されます。

Physical PlanにおけるShuffle

Physical Planにおいて、ShuffleはStageとStageを分断する境界線となります。

MapReduceと異なり、Sparkでは複数の処理をまとめて1つの処理として実行することが可能です。例えば、以下のようなコードを実行するとしましょう。

rdd.map(x => x + 1).filter(x < 0).map(x ** 2)

map, filter, mapという3つの処理がありますが、SparkではMapReduceのようにそれぞれの処理の中間データについて保持する必要がなく、1つの処理としてまとめて一気に一つのタスクとして実行します。このまとめられた単位がStageで、それぞれのパーティションごとに実行される個々の処理がTaskです。

一方、Shuffleを伴う処理では話が異なります。

rdd.map(x => (x, x * 2)).mapValues(x => x + 3).reduceByKey(_ + _).filter((x,y) => y > 1)

この場合、mapValuesまではまとめることができますが、reduceByKeyではパーティションをまたいだデータのマージ処理が行われるため、reduceByKey以降の処理を1つにまとめることはできません。逆に、mapやfilterはデータがパーティションをまたぐことがないので、ローカルで完結する処理として一つにまとめることができるのです。

reduceByKeyに限らず、ShuffleDependencyを作る処理ではすべて同じ制約があるため、ShuffleはStageを分断する境界線となります。

実装を追う

mapやfilterの処理が1つのTaskとしてまとめられるような、Shuffleがない場合に複数の処理を1つにまとめる処理について、どのように実装されているのか軽く調べてみました。

例として、以下の処理を考えてみます。

sc.parallelize(Array(1,2,3)).map(x => x + 1).filter(x > 3).collect()

map, filterの多段処理になっています。

map, filterの実装を見ると、どちらもMapPartitionRDDクラスを生成しています。

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

また、生成時に自分自身を親として登録しています。mapやfilterに渡した関数を実行する関数の登録も行っています。Sparkは遅延実行なので、この時点ではRDDオブジェクトのチェーンが構築されるのみです。

map, filterを実行したところで、以下のようなチェーンのRDDが構築されています。

scala> r.toDebugString
res1: String =
(4) MapPartitionsRDD[2] at filter at <console>:27 []
 |  MapPartitionsRDD[1] at map at <console>:27 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:27 []

このRDDに対してcollectを実行すると実際の計算が行われます。実際にRDDの計算が始まるまでをcollectメソッドから一つ一つ追っていくのは大変ですが、結果的にはRDD.computeがコールされます。computeはイテレータを返します。

MapPartitionRDDクラスはcomputeを以下のような実装でオーバーライドしています。

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

firstParentとは、このRDDの親にあたるRDDです。filterから見ればmap実行時に作成されたRDD、mapから見ればparallelizeで作成されたRDDです。

fはMapPartitionRDD作成時に登録した関数で、最後の引数(イテレータ)に対して関数を適用します。この対象のイテレータは見ての通り親RDDiteratorメソッドにより作成されるものですが、内部では親RDDのcomputeが実行されます。

こうして、末端の子からその親をたどっていき、イテレータに対する処理のチェーンを作ることで、中間データを必要としない1つの処理としてまとめあげているようでした。