The Dabsong Conshirtoe

技術系の話を主にします。

PyConJP 2016で「Pythonで入門するApache Spark」を話した

1ヶ月近く過ぎてしまいましたが、今年のPyConJPで、「Pythonで入門するApache Spark」というタイトルでスピーカーを務めさせていただきました。

資料

Jupyterコード

動画

PyConJPには参加者として2012年くらいから参加していましたが、スピーカーとして参加するのははじめてでした。大きなカンファレンスで一度話してみたいという願望があったので、叶ってよかったです。立ち見の方もいらっしゃるなど、思った以上に人が集まりとても緊張しましたが、良い経験をさせていただきました。

発表資料の準備をすることで、Sparkの基礎をもう一度振り返ることができたのが良かったかな。時には人前にさらされることも大事ですね。

運営の皆様、発表準備を手伝っていただいた皆様、どうもありがとうございました。

会社ブログでも紹介していただきました。ありがとうございます!

SparkのShuffleについて調べてみる (4:Shuffle Readの実装探検)

前回の記事では、SparkのShuffle Writeの実装を追ってみました。

今回は、Shuffle Readの実装について調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)

Shuffle Read

Shuffle Readは、Shuffle Writeにより書き出されたデータを読み出す処理です。処理するタスクの前段のステージがShuffleステージである場合に行われます。

Shuffle Readは、Sort ShuffleでもHash Shuffleでも同じクラス(BlockStoreShuffleReader)が使用されます。

データのフェッチ

BlockStoreShuffleReaderは、ShuffleBlockFetcherIteratorを使用して対象のデータ(ブロックIDとデータストリーム)を逐次取得します。この時、リモートにあるデータを一度にフェッチするデータの最大はspark.reducer.maxSizeInFlightにより制御されます。この値が大きいほどフェッチの効率はよくなりますが、メモリへのプレッシャーが増します。実際には、最大で5ノードへ同時にリクエストを送るためにこの値を5で割った数を上限としているようですが、どこでその「最大5ノード」という制限をしているかは不明でした。

どのBlockManagerからどのBlockIDを抜けばいいかは、MapOutputTrackerというmapタスクの出力を管理するオブジェクトが教えてくれます。

また、Shuffle Writeの方式によってデータの書き出され方は異なりますが、ここの違いはBlockManagerの中でShuffleBlockResolverにより吸収されています。例えば、Sort Shuffleの場合は前回紹介したIndexShuffleBlockResolverが使用されます。

フェッチされた各データは解凍とdesrializeがされ、キーに応じた集約が行われます。Map Side Combineが行なわれている場合はマージされた結果同士のマージ、そうでない場合は個々の値のマージが行われます。

データのマージ

マージにはExternalAppendOnlyMapというオブジェクトが使用されます。このオブジェクトは値のマージ方法を知っており、データを外部から受け入れてマージされた結果を返します。データが一定の水準に達するとディスクに退避するところが特徴的です。Shuffle Writeで登場したExternalSorterもデータがメモリに収まらない場合はディスクに退避する機能を持っていますが、どちらも同じSpillableというtraitで実現しています。

また、ExternalSorterでもデータの保持にはPartitionedAppendOnlyMapとうオブジェクトを使用しており、先のExternalAppendOnlyMapと同じ親(AppendOnlyMap)を持ちます。AppendOnlyMapではKey-ValueのデータがシンプルなArrayとして記録されるオープンアドレスのハッシュテーブルです。基本的にはキーが入っているインデックスの一つ後ろに値が入っているようです。

キーのポジションは、キーのハッシュ値とQuadratic Probingというキーの衝突回避の手法により決定されているようです。

こうしてマージされたデータがBlockStoreShuffleReaderから返却されます。sortByKeyのようなソート順序の指定がある場合、ExternalSorterによりソートが行われた上で返却されます。

以上です。ExternalAppendOnlyMapでspillしたデータとオンメモリのデータをマージするところなど、まだまだ深く潜り込めるポイントはたくさんありそうですが、ひとまずこの辺で。

SparkのShuffleについて調べてみる (3:Shuffle Writeの実装探検)

前回の記事では、SparkのShuffleについて、Physical Planから見た内容についてまとめました。

今回は、実行時の観点からのShuffle Writeについて調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)

実行時のShuffleの流れ

Shuffleはどのように実現されているのかを簡単に見ると、以下の流れとなります。

  1. 各TaskがShuffleのキーごとにデータをファイルに書き出す(Shuffle Write)
  2. reducerごとに担当するキーのデータファイルを読み込み、処理を行う(Shuffle Read)

Shuffle Write

もう少し詳細に流れを追うと、以下の流れとなります。

  1. ShuffleMapTaskが起動されます。
  2. RDDのデータを読み込む。この時、前段のステージがShuffleの場合、ShuffleReaderからデータを取得します(Shuffle Read)。そうでない場合、入力ソースからデータを読み込みます。
  3. 得たデータをShuffleWriterを使って出力します。

ShuffleMapTaskの起動について

まず、TaskにはShuffleMapTaskとResultTaskの2種類の実装があります。ResultTaskは、foreachやcollectのような、ジョブの最終ステージを実行するタスクです。一方、ShuffleMapTaskはそれ以外のステージで実行されるタスクです。

DAGSchedulerでTaskを生成する際、対象のステージがShuffleMapStageである場合に生成されているようです。

ShuffleWriterを使った出力について

ShuffleReaderを使った読み込みについては後半で説明するので一旦スキップします。

ShuffleWriterによる出力について

どのようにデータをShuffleするかについて、Sparkではhash, sort, tungsten-sortの3つの実装がされています。どの実装を選択するかは、spark.shuffle.managerにより制御できます。

Hash Shuffle

Spark 1.2.0以前のデフォルト実装です。各mapタスクが各reducer(Shuffleされたデータを受け取る側)ごとにファイルを作成します。ただ、この方式ではMap Taskの数とReducerの数を掛け合わせた量だけファイルが生成されてしまうため、各ファイルを作ったり消したりするところでのオーバーヘッドが大きく、特にパーティション数が大量にある場合に問題なるケースが多かった様です。

spark.shuffle.consolidateFilesという、この問題に対処するオプションも1.6.0以前のバージョンでは存在しましたが、後述のsort shuffleの存在等の理由により削除されています。

Sort Shuffle

spark.shuffle.managerのデフォルト値です。上記のHash Shuffleの問題に対処するために導入されました。各Mapタスクごとにパーティション、キーでソートされたデータを書き出します。同時に、各ファイルごとにどのパーティションがどこから始まるかわかるようにインデックスファイルを別で書き出します。各Reducerは、このインデックスファイルを頼りに必要なパーティションのデータを読む形になります。

具体的に実装を見ていくと、この一連の処理を担うのが、SortShuffleWriterです。SortShuffleWriterは、ExternalSorterを使ってデータの書き込みを行います。

ExternalSorterでは、MapSideCombineが有効な場合、データはPartitionedAppendOnlyMapを使ってキーごとにマージされた形式でオンメモリに保持されます。内部で持つデータサイズが一定量に達した場合、ExternalSorterはspillという処理により、現在のデータをファイルに書き出し、新しいPartitionedAppendOnlyMapを生成します。この時の書き込みバッファはspark.shuffle.file.bufferspark.shuffle.spill.batchSizeにより制御されているようです。

spillにより作成されたファイルは、すべてマージソートされて一つのファイルとして改めて書き出されます。spillされたファイルは書き込み処理完了後に削除されます。

ファイルをすべて書き出したら、IndexShuffleBlockResolverを使ってインデックスファイルを書き出します。

spillの存在によりオンメモリに乗り切らないデータも安全にシャッフルできるのですが、余計なIOやマージソートが発生するため、処理速度になかなかのインパクトを及ぼします。(個人的にも、spillを削減することで処理速度が数倍早くなったケースがありました。)また、ディスクに小さいファイルを書きまくるのでディスクも圧迫します。spillの情報はSparkのUIから閲覧可能ですので、チューニングの際には参考にすると良いでしょう。

ByPass Merge Sort Shuffle

SortShuffleWriterが使われる設定で以下の条件を満たした時、BypassMergeSortShuffleWriterという特殊な実装が使用されます。

旧来のHash Shuffleと同じく、パーティションの数だけファイルを作成し、最後にそれらのファイルのマージと、パーティションの位置を示すためのインデックスファイルを作るだけというシンプルなものです。

SortShuffleWriteと異なり、データをインメモリに保持する必要がなく、spillも発生しないため効率的です。反面、Hash Shuffleと同じく大量のファイルが作成される恐れがあるため、少ないパーティション数の場合のみ有効といえます。

Tungsten Sort Shuffle

Project Tungstenの一環で、オフヒープを使ったより効率的な実装です。spark.shuffle.managertungsten-sortを指定することで有効になる、という記述もいくつか見ますが、実際にコードを見てみるとsortでもtungsten-sortでも同じ実装(SortShuffleManager)が使われていました。(ここは単に私の読み間違いかもしれません)

実装を見る限り、以下の条件を満たす事でUnsafeShuffleWriterが使用されるようです。

  • シリアライズされたオブジェクトを整列した結果が、シリアライズする前に整列した結果と一致すること(ちゃんと理解できでるか微妙です。ドキュメントを見る限り、シリアライズしたオブジェクトのバイトストリームに対して、ソートなどして順序を変更することが可能か、ということでしょうか)
  • Shuffleの処理に集約を含まない
  • パーティションの数が16,777,216より少ない

内部ではShuffleExternalSorterが使われています。Sort Shuffleで出てきたExternalSorterと同じく、内部のレコードはパーティションIDでソートされ、単一のファイルに出力が行われます。

この実装ではserializeされたオブジェクトを直接扱うため、Sort Shuffleで必要とされたserialize/desrializeの処理が不要となります。spillのマージには、FileStreamを使うバージョンとNIOのFileChannelを使うバージョンがあるようです。

また、データはオフヒープに格納され、そのアドレスとパーティションIDがShuffleMemorySorterという専用のSorterに格納されます。アドレスとパーティションIDは単一のLong型に変換されるため、1レコードで使うサイズは8バイト(!)です。

データを書き出す際は、TimSortにより内部の先ほどのデータがソートされます。

SparkのShuffleについて調べてみる (2:Physical Plan)

前回の記事では、SparkのShuffleについての概要と、Logical Planから見たShuffleについてまとめました。

今回は、Physical Planから見たShuffleについて調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)

Physical Planについて

Logical Planで作成したDAGを、実際にどのような粒度のタスクに分解し、どのように実行するのかを定めたものがPhysical Planです。

DAGは、いくつかのStageに分解され、さらにそのStageではRDDパーティションごとにTaskという単位で処理が実行されます。

Physical PlanにおけるShuffle

Physical Planにおいて、ShuffleはStageとStageを分断する境界線となります。

MapReduceと異なり、Sparkでは複数の処理をまとめて1つの処理として実行することが可能です。例えば、以下のようなコードを実行するとしましょう。

rdd.map(x => x + 1).filter(x < 0).map(x ** 2)

map, filter, mapという3つの処理がありますが、SparkではMapReduceのようにそれぞれの処理の中間データについて保持する必要がなく、1つの処理としてまとめて一気に一つのタスクとして実行します。このまとめられた単位がStageで、それぞれのパーティションごとに実行される個々の処理がTaskです。

一方、Shuffleを伴う処理では話が異なります。

rdd.map(x => (x, x * 2)).mapValues(x => x + 3).reduceByKey(_ + _).filter((x,y) => y > 1)

この場合、mapValuesまではまとめることができますが、reduceByKeyではパーティションをまたいだデータのマージ処理が行われるため、reduceByKey以降の処理を1つにまとめることはできません。逆に、mapやfilterはデータがパーティションをまたぐことがないので、ローカルで完結する処理として一つにまとめることができるのです。

reduceByKeyに限らず、ShuffleDependencyを作る処理ではすべて同じ制約があるため、ShuffleはStageを分断する境界線となります。

実装を追う

mapやfilterの処理が1つのTaskとしてまとめられるような、Shuffleがない場合に複数の処理を1つにまとめる処理について、どのように実装されているのか軽く調べてみました。

例として、以下の処理を考えてみます。

sc.parallelize(Array(1,2,3)).map(x => x + 1).filter(x > 3).collect()

map, filterの多段処理になっています。

map, filterの実装を見ると、どちらもMapPartitionRDDクラスを生成しています。

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

また、生成時に自分自身を親として登録しています。mapやfilterに渡した関数を実行する関数の登録も行っています。Sparkは遅延実行なので、この時点ではRDDオブジェクトのチェーンが構築されるのみです。

map, filterを実行したところで、以下のようなチェーンのRDDが構築されています。

scala> r.toDebugString
res1: String =
(4) MapPartitionsRDD[2] at filter at <console>:27 []
 |  MapPartitionsRDD[1] at map at <console>:27 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:27 []

このRDDに対してcollectを実行すると実際の計算が行われます。実際にRDDの計算が始まるまでをcollectメソッドから一つ一つ追っていくのは大変ですが、結果的にはRDD.computeがコールされます。computeはイテレータを返します。

MapPartitionRDDクラスはcomputeを以下のような実装でオーバーライドしています。

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

firstParentとは、このRDDの親にあたるRDDです。filterから見ればmap実行時に作成されたRDD、mapから見ればparallelizeで作成されたRDDです。

fはMapPartitionRDD作成時に登録した関数で、最後の引数(イテレータ)に対して関数を適用します。この対象のイテレータは見ての通り親RDDiteratorメソッドにより作成されるものですが、内部では親RDDのcomputeが実行されます。

こうして、末端の子からその親をたどっていき、イテレータに対する処理のチェーンを作ることで、中間データを必要としない1つの処理としてまとめあげているようでした。

「詳解Apache Spark」刊行

技術評論社から「詳解Apache Spark」が刊行されました!

www.amazon.co.jp

今回、私は内容のレビューという形でこの本の出版に関わらせていただきました。

総評

Apache Sparkについてなかなか網羅的にまとめられている良本です。Sparkは情報が出てきているとはいえ、まだまだ英語ソースのものが多かったりします。また、バージョンアップのたびに様々な新機能の導入を行っており、情報が古くなりがちです。本書は1.6.0を対象に執筆されており、現時点の最新は1.6.1なのでほぼ最新の内容といえます。

Sparkの導入を考えているエンジニアはとりあえず手元に置いておいても損ないのではないでしょうか。

章立ては以下です。

  • 第1章 Sparkの基礎
  • 第2章 Sparkの導入
  • 第3章 RDD
  • 第4章 クラスタ構築と運用
  • 第5章 DataFrameとSpark SQL
  • 第6章 Spark Streaming
  • 第7章 MLlib
  • 第8章 GraphX
  • Appendix YARN

RDDのようなcoreの部分はもとより、SparkSQL, Spark Streaming, MLlib, GraphXといった主要コンポーネントすべてを網羅しています。さらに、それぞれについてサラッとではなく、基礎から応用までしっかりと説明されているのがとても良いです。

個人的に良かった章

4章「クラスタ構築と運用」

YARNを利用した場合、ジョブがどのようなオブジェクトを介して、Executorで実行されるのかが図解されているのがためになります。DAGSchedulerを介してCoarseGrainedExecutorBackendを通り、最終的にExecutorのスレッドプール上でタスクが起動される、のような。

5章「DataFrameとSpark SQL

DataFrameのAPIについて詳しく解説されておりためになりました。DataFrameのAPIについては公式ドキュメントでも最低限のことしか書かれておらず、実際に使おうとするときに困ってしまい、結局ソースコードのdocsやコードそのものを見るハメになってしまいます。

本章で紹介されているDataFrameNaFunctionsとかとても便利なんですが、普通にドキュメント読んだだけではその存在を知りえませんw

8章「MLlib」

spark.mllibと、新しいAPIであるspark.mlの両方を解説しています。また、TF-IDFやOneHotEncoderなど、特徴抽出や変換のアルゴリズムについても丁寧に解説されているのが勉強になります。ケーススタディ複数あるのも理解を助けますね。

締め

Spark SQLやMLlibを仕事で使っている私のようなある程度使っているユーザーにとってもためになる内容が多かったです。是非!

低レベルな技術も大事ですよね。

昔お世話になった方が書いていたので読んでみました。

www.wantedly.com

感想を適当に。

低レベルな技術を身につけよう

マネージドサービスや、高レベルなAPIを備えたミドルウェアに囲まれて生きている我々には耳の痛い話です。

全員がLinuxカーネルについて深い知識を得る必要があるとは思いませんが、結局何を使うにも、真面目に使うなら中身の仕組みをある程度知らないと運用していけないですよね。

Sparkなんかは高レベルなAPIで簡単に大規模分散処理ができそうな雰囲気を出してきますが、ある程度中身の仕組みを知っていないとチューニングも運用もできません。

MySQLなんかもそうですよね。

早く、多く失敗しよう

そうですね。やりたいことはどんどん手を上げてやるべきだと思います。

しかしながら、やりたいことを思いつくにはある程度のインプットがないとできないように感じます。 例えば「Docker導入したい!」と思い至るには、Dockerについての知識や事例、現在のプロダクトが抱えている問題に対する理解がないと思い至らないですよね。

ということで、広く浅く色々なものを貪ることも大事かなと思いました。

もっとも大切なことに集中しよう

B to Cな会社で働いたことがないのでよくわかりませんが、この辺はとても苦労してそうです。

優先順位付けとなにをやらないかの交渉は必須ですよね。

SparkのShuffleについて調べてみる (1:概要とLogical Plan)

Sparkでは、reduceByKeyやgroupByKeyのような特定の処理を行う場合、シャッフルと呼ばれる処理が実行されます。 これの詳細について自分なりにある程度詳細な理解がしたかったのでまとめてみます。

Sparkのバージョンは1.6.1です。

SparkでのShuffleの概要

SparkにおけるShuffleは、あるRDDに対して操作を行う際にデータの再分散を行うことといえます。

例えば、mapは各RDDに対してmap処理を行うことで事足ります。この時、データの移動は必要ないのでShuffleは発生しません。(図の青い箱がRDD、丸い箱がパーティションです)

f:id:Attsun_1031:20160325001132p:plain

一方、reduceByKeyでは、指定されたキーでの集計が必要となるため、クラスタ上に散らばっている各RDDをキーごとにまとめる必要があり、この時にネットワーク越しのデータ転送、つまりShuffleが行なわれます。

f:id:Attsun_1031:20160325001649p:plain

Shuffleはネットワーク越しのデータ転送が行なわれますので、特に巨大なデータに対してShuffleをする場合は大量のディスクIOやネットワークIOが発生することになるので、パフォーマンスに影響を与えやすいです。

Sparkアプリケーションの実行フロー

Sparkアプリケーションはどのようにして実行されるのでしょう?大まかに分けると、3つのフェーズから成り立ちます。

  1. プログラムから、RDDの依存グラフ(DAG)を作成(Logical Plan)
  2. 依存グラフをステージやタスクといった単位に分解(Physical Plan)
  3. Physical Planに沿って実行

この各フェーズについて、Shuffleがどのように扱われているかを見ていきたいと思います。

Logical Planから見たShuffle

まず、Logical PlanでのShuffleについて見てみましょう。

Logical Planでは、RDDの変換処理からDAGを生成します。例えばmap処理の場合、先ほどの最初の図のようなグラフが作られます。

f:id:Attsun_1031:20160325001132p:plain

一方、Shuffleが伴う処理がおこなれた場合、2つ目の図のようなグラフが作られます。

f:id:Attsun_1031:20160325001649p:plain

この時、SparkではDAGの他に、RDD間にどのような依存関係があるかという情報を保持しています。(RDDクラスのdependenciesメソッドにより取得できます)

依存関係には大きく分けて2種類あります。

NarrowDependency

子のRDDの各パーティションが、親のRDDのどれかのパーティション全てに依存している

ShuffleDependency

子のRDDの各パーティションが、親のRDDの複数のパーティションの一部に依存している

名前から見てわかる通り、Shuffleを伴う操作がある場合、その親との依存関係はShuffleDependencyとなります。

「一部に依存している」とは、例えばreduceByKeyのようにキーごとに集計する場合は同じキーのデータが同じパーティションに入るようにする必要があるので、親の各パーティションからそのキーのデータのみ(つまり一部)を抜き出して新しいパーティションに引っ張ってくる、ということを意味します。

以下のリンクの右下の図が、ShuffleDependencyです。図にするとわかりやすいですね。

https://github.com/JerryLead/SparkInternals/blob/master/PNGfigures/Dependency.png

ShuffleDependencyは、Shuffleされたデータをどのように集約するかやmap side combine(データ転送の前にローカルのデータのみで集計を済ませておくことでネットワーク転送量を減らす仕掛け)を行うか、といった情報を保持しています。

依存関係が構築される例

例えば、map処理は(NarrowDependencyの子クラスである)OneToOneDepenencyを持ちます。

scala> sc.parallelize(Array(1,2,3)).map(_ + 1).dependencies
res19: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@6a11e139)

一方、reduceByKeyにより生成されるRDDはShuffleDependencyを持ちます。

scala> sc.parallelize(Array(("x", 1), ("y", 2), ("x", 3))).reduceByKey(_ + _).dependencies
res17: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1a47ddfb)

mapのように必ずどの依存関係を持つか決まっているものもありますが、例えばjoinはpartitionerが同じならNarrowDependencyになりますし、そうでないならShuffleDependencyになります。これは、もし2つのRDDが同じようにパーティショニングされているならデータを再分散する必要がないからです。