The Dabsong Conshirtoe

技術系の話を主にします。

SparkのShuffleについて調べてみる (3:Shuffle Writeの実装探検)

前回の記事では、SparkのShuffleについて、Physical Planから見た内容についてまとめました。

今回は、実行時の観点からのShuffle Writeについて調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)

実行時のShuffleの流れ

Shuffleはどのように実現されているのかを簡単に見ると、以下の流れとなります。

  1. 各TaskがShuffleのキーごとにデータをファイルに書き出す(Shuffle Write)
  2. reducerごとに担当するキーのデータファイルを読み込み、処理を行う(Shuffle Read)

Shuffle Write

もう少し詳細に流れを追うと、以下の流れとなります。

  1. ShuffleMapTaskが起動されます。
  2. RDDのデータを読み込む。この時、前段のステージがShuffleの場合、ShuffleReaderからデータを取得します(Shuffle Read)。そうでない場合、入力ソースからデータを読み込みます。
  3. 得たデータをShuffleWriterを使って出力します。

ShuffleMapTaskの起動について

まず、TaskにはShuffleMapTaskとResultTaskの2種類の実装があります。ResultTaskは、foreachやcollectのような、ジョブの最終ステージを実行するタスクです。一方、ShuffleMapTaskはそれ以外のステージで実行されるタスクです。

DAGSchedulerでTaskを生成する際、対象のステージがShuffleMapStageである場合に生成されているようです。

ShuffleWriterを使った出力について

ShuffleReaderを使った読み込みについては後半で説明するので一旦スキップします。

ShuffleWriterによる出力について

どのようにデータをShuffleするかについて、Sparkではhash, sort, tungsten-sortの3つの実装がされています。どの実装を選択するかは、spark.shuffle.managerにより制御できます。

Hash Shuffle

Spark 1.2.0以前のデフォルト実装です。各mapタスクが各reducer(Shuffleされたデータを受け取る側)ごとにファイルを作成します。ただ、この方式ではMap Taskの数とReducerの数を掛け合わせた量だけファイルが生成されてしまうため、各ファイルを作ったり消したりするところでのオーバーヘッドが大きく、特にパーティション数が大量にある場合に問題なるケースが多かった様です。

spark.shuffle.consolidateFilesという、この問題に対処するオプションも1.6.0以前のバージョンでは存在しましたが、後述のsort shuffleの存在等の理由により削除されています。

Sort Shuffle

spark.shuffle.managerのデフォルト値です。上記のHash Shuffleの問題に対処するために導入されました。各Mapタスクごとにパーティション、キーでソートされたデータを書き出します。同時に、各ファイルごとにどのパーティションがどこから始まるかわかるようにインデックスファイルを別で書き出します。各Reducerは、このインデックスファイルを頼りに必要なパーティションのデータを読む形になります。

具体的に実装を見ていくと、この一連の処理を担うのが、SortShuffleWriterです。SortShuffleWriterは、ExternalSorterを使ってデータの書き込みを行います。

ExternalSorterでは、MapSideCombineが有効な場合、データはPartitionedAppendOnlyMapを使ってキーごとにマージされた形式でオンメモリに保持されます。内部で持つデータサイズが一定量に達した場合、ExternalSorterはspillという処理により、現在のデータをファイルに書き出し、新しいPartitionedAppendOnlyMapを生成します。この時の書き込みバッファはspark.shuffle.file.bufferspark.shuffle.spill.batchSizeにより制御されているようです。

spillにより作成されたファイルは、すべてマージソートされて一つのファイルとして改めて書き出されます。spillされたファイルは書き込み処理完了後に削除されます。

ファイルをすべて書き出したら、IndexShuffleBlockResolverを使ってインデックスファイルを書き出します。

spillの存在によりオンメモリに乗り切らないデータも安全にシャッフルできるのですが、余計なIOやマージソートが発生するため、処理速度になかなかのインパクトを及ぼします。(個人的にも、spillを削減することで処理速度が数倍早くなったケースがありました。)また、ディスクに小さいファイルを書きまくるのでディスクも圧迫します。spillの情報はSparkのUIから閲覧可能ですので、チューニングの際には参考にすると良いでしょう。

ByPass Merge Sort Shuffle

SortShuffleWriterが使われる設定で以下の条件を満たした時、BypassMergeSortShuffleWriterという特殊な実装が使用されます。

旧来のHash Shuffleと同じく、パーティションの数だけファイルを作成し、最後にそれらのファイルのマージと、パーティションの位置を示すためのインデックスファイルを作るだけというシンプルなものです。

SortShuffleWriteと異なり、データをインメモリに保持する必要がなく、spillも発生しないため効率的です。反面、Hash Shuffleと同じく大量のファイルが作成される恐れがあるため、少ないパーティション数の場合のみ有効といえます。

Tungsten Sort Shuffle

Project Tungstenの一環で、オフヒープを使ったより効率的な実装です。spark.shuffle.managertungsten-sortを指定することで有効になる、という記述もいくつか見ますが、実際にコードを見てみるとsortでもtungsten-sortでも同じ実装(SortShuffleManager)が使われていました。(ここは単に私の読み間違いかもしれません)

実装を見る限り、以下の条件を満たす事でUnsafeShuffleWriterが使用されるようです。

  • シリアライズされたオブジェクトを整列した結果が、シリアライズする前に整列した結果と一致すること(ちゃんと理解できでるか微妙です。ドキュメントを見る限り、シリアライズしたオブジェクトのバイトストリームに対して、ソートなどして順序を変更することが可能か、ということでしょうか)
  • Shuffleの処理に集約を含まない
  • パーティションの数が16,777,216より少ない

内部ではShuffleExternalSorterが使われています。Sort Shuffleで出てきたExternalSorterと同じく、内部のレコードはパーティションIDでソートされ、単一のファイルに出力が行われます。

この実装ではserializeされたオブジェクトを直接扱うため、Sort Shuffleで必要とされたserialize/desrializeの処理が不要となります。spillのマージには、FileStreamを使うバージョンとNIOのFileChannelを使うバージョンがあるようです。

また、データはオフヒープに格納され、そのアドレスとパーティションIDがShuffleMemorySorterという専用のSorterに格納されます。アドレスとパーティションIDは単一のLong型に変換されるため、1レコードで使うサイズは8バイト(!)です。

データを書き出す際は、TimSortにより内部の先ほどのデータがソートされます。

SparkのShuffleについて調べてみる (2:Physical Plan)

前回の記事では、SparkのShuffleについての概要と、Logical Planから見たShuffleについてまとめました。

今回は、Physical Planから見たShuffleについて調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)

Physical Planについて

Logical Planで作成したDAGを、実際にどのような粒度のタスクに分解し、どのように実行するのかを定めたものがPhysical Planです。

DAGは、いくつかのStageに分解され、さらにそのStageではRDDパーティションごとにTaskという単位で処理が実行されます。

Physical PlanにおけるShuffle

Physical Planにおいて、ShuffleはStageとStageを分断する境界線となります。

MapReduceと異なり、Sparkでは複数の処理をまとめて1つの処理として実行することが可能です。例えば、以下のようなコードを実行するとしましょう。

rdd.map(x => x + 1).filter(x < 0).map(x ** 2)

map, filter, mapという3つの処理がありますが、SparkではMapReduceのようにそれぞれの処理の中間データについて保持する必要がなく、1つの処理としてまとめて一気に一つのタスクとして実行します。このまとめられた単位がStageで、それぞれのパーティションごとに実行される個々の処理がTaskです。

一方、Shuffleを伴う処理では話が異なります。

rdd.map(x => (x, x * 2)).mapValues(x => x + 3).reduceByKey(_ + _).filter((x,y) => y > 1)

この場合、mapValuesまではまとめることができますが、reduceByKeyではパーティションをまたいだデータのマージ処理が行われるため、reduceByKey以降の処理を1つにまとめることはできません。逆に、mapやfilterはデータがパーティションをまたぐことがないので、ローカルで完結する処理として一つにまとめることができるのです。

reduceByKeyに限らず、ShuffleDependencyを作る処理ではすべて同じ制約があるため、ShuffleはStageを分断する境界線となります。

実装を追う

mapやfilterの処理が1つのTaskとしてまとめられるような、Shuffleがない場合に複数の処理を1つにまとめる処理について、どのように実装されているのか軽く調べてみました。

例として、以下の処理を考えてみます。

sc.parallelize(Array(1,2,3)).map(x => x + 1).filter(x > 3).collect()

map, filterの多段処理になっています。

map, filterの実装を見ると、どちらもMapPartitionRDDクラスを生成しています。

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

また、生成時に自分自身を親として登録しています。mapやfilterに渡した関数を実行する関数の登録も行っています。Sparkは遅延実行なので、この時点ではRDDオブジェクトのチェーンが構築されるのみです。

map, filterを実行したところで、以下のようなチェーンのRDDが構築されています。

scala> r.toDebugString
res1: String =
(4) MapPartitionsRDD[2] at filter at <console>:27 []
 |  MapPartitionsRDD[1] at map at <console>:27 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:27 []

このRDDに対してcollectを実行すると実際の計算が行われます。実際にRDDの計算が始まるまでをcollectメソッドから一つ一つ追っていくのは大変ですが、結果的にはRDD.computeがコールされます。computeはイテレータを返します。

MapPartitionRDDクラスはcomputeを以下のような実装でオーバーライドしています。

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

firstParentとは、このRDDの親にあたるRDDです。filterから見ればmap実行時に作成されたRDD、mapから見ればparallelizeで作成されたRDDです。

fはMapPartitionRDD作成時に登録した関数で、最後の引数(イテレータ)に対して関数を適用します。この対象のイテレータは見ての通り親RDDiteratorメソッドにより作成されるものですが、内部では親RDDのcomputeが実行されます。

こうして、末端の子からその親をたどっていき、イテレータに対する処理のチェーンを作ることで、中間データを必要としない1つの処理としてまとめあげているようでした。

「詳解Apache Spark」刊行

技術評論社から「詳解Apache Spark」が刊行されました!

www.amazon.co.jp

今回、私は内容のレビューという形でこの本の出版に関わらせていただきました。

総評

Apache Sparkについてなかなか網羅的にまとめられている良本です。Sparkは情報が出てきているとはいえ、まだまだ英語ソースのものが多かったりします。また、バージョンアップのたびに様々な新機能の導入を行っており、情報が古くなりがちです。本書は1.6.0を対象に執筆されており、現時点の最新は1.6.1なのでほぼ最新の内容といえます。

Sparkの導入を考えているエンジニアはとりあえず手元に置いておいても損ないのではないでしょうか。

章立ては以下です。

  • 第1章 Sparkの基礎
  • 第2章 Sparkの導入
  • 第3章 RDD
  • 第4章 クラスタ構築と運用
  • 第5章 DataFrameとSpark SQL
  • 第6章 Spark Streaming
  • 第7章 MLlib
  • 第8章 GraphX
  • Appendix YARN

RDDのようなcoreの部分はもとより、SparkSQL, Spark Streaming, MLlib, GraphXといった主要コンポーネントすべてを網羅しています。さらに、それぞれについてサラッとではなく、基礎から応用までしっかりと説明されているのがとても良いです。

個人的に良かった章

4章「クラスタ構築と運用」

YARNを利用した場合、ジョブがどのようなオブジェクトを介して、Executorで実行されるのかが図解されているのがためになります。DAGSchedulerを介してCoarseGrainedExecutorBackendを通り、最終的にExecutorのスレッドプール上でタスクが起動される、のような。

5章「DataFrameとSpark SQL

DataFrameのAPIについて詳しく解説されておりためになりました。DataFrameのAPIについては公式ドキュメントでも最低限のことしか書かれておらず、実際に使おうとするときに困ってしまい、結局ソースコードのdocsやコードそのものを見るハメになってしまいます。

本章で紹介されているDataFrameNaFunctionsとかとても便利なんですが、普通にドキュメント読んだだけではその存在を知りえませんw

8章「MLlib」

spark.mllibと、新しいAPIであるspark.mlの両方を解説しています。また、TF-IDFやOneHotEncoderなど、特徴抽出や変換のアルゴリズムについても丁寧に解説されているのが勉強になります。ケーススタディ複数あるのも理解を助けますね。

締め

Spark SQLやMLlibを仕事で使っている私のようなある程度使っているユーザーにとってもためになる内容が多かったです。是非!

低レベルな技術も大事ですよね。

昔お世話になった方が書いていたので読んでみました。

www.wantedly.com

感想を適当に。

低レベルな技術を身につけよう

マネージドサービスや、高レベルなAPIを備えたミドルウェアに囲まれて生きている我々には耳の痛い話です。

全員がLinuxカーネルについて深い知識を得る必要があるとは思いませんが、結局何を使うにも、真面目に使うなら中身の仕組みをある程度知らないと運用していけないですよね。

Sparkなんかは高レベルなAPIで簡単に大規模分散処理ができそうな雰囲気を出してきますが、ある程度中身の仕組みを知っていないとチューニングも運用もできません。

MySQLなんかもそうですよね。

早く、多く失敗しよう

そうですね。やりたいことはどんどん手を上げてやるべきだと思います。

しかしながら、やりたいことを思いつくにはある程度のインプットがないとできないように感じます。 例えば「Docker導入したい!」と思い至るには、Dockerについての知識や事例、現在のプロダクトが抱えている問題に対する理解がないと思い至らないですよね。

ということで、広く浅く色々なものを貪ることも大事かなと思いました。

もっとも大切なことに集中しよう

B to Cな会社で働いたことがないのでよくわかりませんが、この辺はとても苦労してそうです。

優先順位付けとなにをやらないかの交渉は必須ですよね。

SparkのShuffleについて調べてみる (1:概要とLogical Plan)

Sparkでは、reduceByKeyやgroupByKeyのような特定の処理を行う場合、シャッフルと呼ばれる処理が実行されます。 これの詳細について自分なりにある程度詳細な理解がしたかったのでまとめてみます。

Sparkのバージョンは1.6.1です。

SparkでのShuffleの概要

SparkにおけるShuffleは、あるRDDに対して操作を行う際にデータの再分散を行うことといえます。

例えば、mapは各RDDに対してmap処理を行うことで事足ります。この時、データの移動は必要ないのでShuffleは発生しません。(図の青い箱がRDD、丸い箱がパーティションです)

f:id:Attsun_1031:20160325001132p:plain

一方、reduceByKeyでは、指定されたキーでの集計が必要となるため、クラスタ上に散らばっている各RDDをキーごとにまとめる必要があり、この時にネットワーク越しのデータ転送、つまりShuffleが行なわれます。

f:id:Attsun_1031:20160325001649p:plain

Shuffleはネットワーク越しのデータ転送が行なわれますので、特に巨大なデータに対してShuffleをする場合は大量のディスクIOやネットワークIOが発生することになるので、パフォーマンスに影響を与えやすいです。

Sparkアプリケーションの実行フロー

Sparkアプリケーションはどのようにして実行されるのでしょう?大まかに分けると、3つのフェーズから成り立ちます。

  1. プログラムから、RDDの依存グラフ(DAG)を作成(Logical Plan)
  2. 依存グラフをステージやタスクといった単位に分解(Physical Plan)
  3. Physical Planに沿って実行

この各フェーズについて、Shuffleがどのように扱われているかを見ていきたいと思います。

Logical Planから見たShuffle

まず、Logical PlanでのShuffleについて見てみましょう。

Logical Planでは、RDDの変換処理からDAGを生成します。例えばmap処理の場合、先ほどの最初の図のようなグラフが作られます。

f:id:Attsun_1031:20160325001132p:plain

一方、Shuffleが伴う処理がおこなれた場合、2つ目の図のようなグラフが作られます。

f:id:Attsun_1031:20160325001649p:plain

この時、SparkではDAGの他に、RDD間にどのような依存関係があるかという情報を保持しています。(RDDクラスのdependenciesメソッドにより取得できます)

依存関係には大きく分けて2種類あります。

NarrowDependency

子のRDDの各パーティションが、親のRDDのどれかのパーティション全てに依存している

ShuffleDependency

子のRDDの各パーティションが、親のRDDの複数のパーティションの一部に依存している

名前から見てわかる通り、Shuffleを伴う操作がある場合、その親との依存関係はShuffleDependencyとなります。

「一部に依存している」とは、例えばreduceByKeyのようにキーごとに集計する場合は同じキーのデータが同じパーティションに入るようにする必要があるので、親の各パーティションからそのキーのデータのみ(つまり一部)を抜き出して新しいパーティションに引っ張ってくる、ということを意味します。

以下のリンクの右下の図が、ShuffleDependencyです。図にするとわかりやすいですね。

https://github.com/JerryLead/SparkInternals/blob/master/PNGfigures/Dependency.png

ShuffleDependencyは、Shuffleされたデータをどのように集約するかやmap side combine(データ転送の前にローカルのデータのみで集計を済ませておくことでネットワーク転送量を減らす仕掛け)を行うか、といった情報を保持しています。

依存関係が構築される例

例えば、map処理は(NarrowDependencyの子クラスである)OneToOneDepenencyを持ちます。

scala> sc.parallelize(Array(1,2,3)).map(_ + 1).dependencies
res19: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@6a11e139)

一方、reduceByKeyにより生成されるRDDはShuffleDependencyを持ちます。

scala> sc.parallelize(Array(("x", 1), ("y", 2), ("x", 3))).reduceByKey(_ + _).dependencies
res17: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1a47ddfb)

mapのように必ずどの依存関係を持つか決まっているものもありますが、例えばjoinはpartitionerが同じならNarrowDependencyになりますし、そうでないならShuffleDependencyになります。これは、もし2つのRDDが同じようにパーティショニングされているならデータを再分散する必要がないからです。

Spark1.6 - Datasetの導入とメモリ管理の変更

spark1.6がリリースされました。

Spark 1.6.0 released | Apache Spark

詳しい変更点は上記を見ていただくとして、ここではDatasetとメモリ管理方式の変更について紹介します。

メモリ管理の変更

Spark1.6ではメモリ(ヒープ)の管理が柔軟になり、より効率的にメモリを使用できるようになりました。

1.5までの世界

Sparkではメモリの使用領域が主に2つの領域に分かれています。

ストレージ領域

RDDをメモリにキャッシュする際に使われる領域です。

実行領域

シャッフルなど、タスクの実行に使われる領域です。

従来、この2つの領域は厳密に分断されていたため、チューニングを誤ると全体としてはメモリが余っているにもかかわらず、無駄なディスク書き込みが発生するなどしてしまいました。

1.6からの世界

1.6ではこの管理方式に変更が加えられ、お互いがお互いの領域を使用することが可能になりました。

また、ストレージ領域がspark.memory.storageFractionを超えてメモリを使用している場合、実行領域はストレージ領域の使用メモリがspark.memory.storageFractionを上回っている限り横取りすることができるそうです。(逆は不可能)

重要な設定値

spark.memory.fraction

JVMのうち、Sparkが使用できるメモリの割合を示す。

spark.memory.storageFraction

spark.memory.fractionのうち、ストレージ領域に最低どれだけ確保できるかを示す。

spark.memory.useLegacyMode

trueに設定することで、1.5以前の古いメモリ管理方法を採用できる。

Dataset API

1.6で初めて追加された試験的な機能です。pythonは未サポート。

Datasetを使うことで、素のRDDがもたらす静的な型チェックと、DataFrameがもたらす実行の最適化の両方の恩恵が受けられます。

型付けについて詳しくみてみましょう。

静的な型チェック

静的な型チェックが得られるとはどういうことでしょうか?このメリットを理解するには、まずDataFrameの問題点を理解する必要があります。

DataFrameの例

例えば、名前(String)と年齢(Long)の2つのフィールドを持つPersonスキーマを外部から読み込むとしましょう。(このjsonは、sparkのexapmleディレクトリに含まれているものです)

scala> case class Person(name: String, age: Long)
scala> val path = "examples/src/main/resources/people.json"
scala> val people: DataFrame = sqlContext.read.json(path)

次は、このpeopleからage列のみを取り出すメソッドを定義したいとします。

scala> def collectAge(d: DataFrame): DataFrame = d.select($"age")

一見問題ないように見えますが、実はここに危険があります。

まず、戻ってくる型がDataFrameという非常に抽象的な型であるため、実装をよく見ないと実際どのようなスキーマDataFrameが戻ってきているかわからず、当然静的な型チェックも効きません。

例えば以下のような定義はすべてコンパイルを通過します。

// ageではなくnameを返してしまっているが、コンパイルできる
scala> def collectAge(d: DataFrame): DataFrame = d.select($"name")
 
// 存在しないフィールドを得るメソッドもコンパイルは通過する
scala> def collectAge(d: DataFrame): DataFrame = d.select($"xxx")
// 実行時にエラーとなる。
scala> collectAge(people)
org.apache.spark.sql.AnalysisException: cannot resolve 'xxx' given input columns age, name;

DataFrameに対する操作が複雑になればなるほど、今扱っているDataFrameが一体どのようなものなのか、正しくスキーマを守れているのか、といったことがわかなくなります。

Datasetの例

Datasetはこれを解決します。

まずはDatasetを作ってみましょう。DataFrame.asによりDatasetに変換できます。

scala> val people: Dataset[Person] = sqlContext.read.json(path).as[Person]

次に、DataFrameの例のときと同様にageを取り出す関数の定義です。

scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.age)

見てお分かりだと思いますが、戻り値のDatasetがどのような型を持っているか明示されています。

これにより、DataFrameの時に問題になった間違ったメソッド定義はすべてコンパイル時にチェックされます。

// 戻り値の型と一致しないのでエラー
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name)
<console>:32: error: type mismatch;
 found   : String
 required: Long
         def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name)
 
// Personに定義されていないフィールドを参照しようとしているのでエラー
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)
<console>:32: error: value xxx is not a member of Person
         def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)

これにより、SparkSQLの最適化の恩恵を受けながら、静的な方チェックの恩恵を受けることにより、安全にプログラムを書くことが可能になりました。めでたしめでたし。

Scala占い

あけましておめでとうございます。 恒例の占い企画です。 2013年はPython、2014年はHaskell、2015年はGoで占いをしました。今年はScalaです。

scala> import scala.util.Random
import scala.util.Random

scala> Random.shuffle(List("大凶", "凶", "大吉", "中吉", "小吉", "吉", "マジ吉")).head
res0: String = 大吉

素晴らしい!!

今年もよろしくお願いします。