The Dabsong Conshirtoe

技術系の話を主にします。

SparkのShuffleについて調べてみる (2:Physical Plan)

前回の記事では、SparkのShuffleについての概要と、Logical Planから見たShuffleについてまとめました。

今回は、Physical Planから見たShuffleについて調べていきたいと思います。(前回と同じく今回も個人的な理解の促進のためにこの日記を書いています。)

Physical Planについて

Logical Planで作成したDAGを、実際にどのような粒度のタスクに分解し、どのように実行するのかを定めたものがPhysical Planです。

DAGは、いくつかのStageに分解され、さらにそのStageではRDDパーティションごとにTaskという単位で処理が実行されます。

Physical PlanにおけるShuffle

Physical Planにおいて、ShuffleはStageとStageを分断する境界線となります。

MapReduceと異なり、Sparkでは複数の処理をまとめて1つの処理として実行することが可能です。例えば、以下のようなコードを実行するとしましょう。

rdd.map(x => x + 1).filter(x < 0).map(x ** 2)

map, filter, mapという3つの処理がありますが、SparkではMapReduceのようにそれぞれの処理の中間データについて保持する必要がなく、1つの処理としてまとめて一気に一つのタスクとして実行します。このまとめられた単位がStageで、それぞれのパーティションごとに実行される個々の処理がTaskです。

一方、Shuffleを伴う処理では話が異なります。

rdd.map(x => (x, x * 2)).mapValues(x => x + 3).reduceByKey(_ + _).filter((x,y) => y > 1)

この場合、mapValuesまではまとめることができますが、reduceByKeyではパーティションをまたいだデータのマージ処理が行われるため、reduceByKey以降の処理を1つにまとめることはできません。逆に、mapやfilterはデータがパーティションをまたぐことがないので、ローカルで完結する処理として一つにまとめることができるのです。

reduceByKeyに限らず、ShuffleDependencyを作る処理ではすべて同じ制約があるため、ShuffleはStageを分断する境界線となります。

実装を追う

mapやfilterの処理が1つのTaskとしてまとめられるような、Shuffleがない場合に複数の処理を1つにまとめる処理について、どのように実装されているのか軽く調べてみました。

例として、以下の処理を考えてみます。

sc.parallelize(Array(1,2,3)).map(x => x + 1).filter(x > 3).collect()

map, filterの多段処理になっています。

map, filterの実装を見ると、どちらもMapPartitionRDDクラスを生成しています。

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

また、生成時に自分自身を親として登録しています。mapやfilterに渡した関数を実行する関数の登録も行っています。Sparkは遅延実行なので、この時点ではRDDオブジェクトのチェーンが構築されるのみです。

map, filterを実行したところで、以下のようなチェーンのRDDが構築されています。

scala> r.toDebugString
res1: String =
(4) MapPartitionsRDD[2] at filter at <console>:27 []
 |  MapPartitionsRDD[1] at map at <console>:27 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:27 []

このRDDに対してcollectを実行すると実際の計算が行われます。実際にRDDの計算が始まるまでをcollectメソッドから一つ一つ追っていくのは大変ですが、結果的にはRDD.computeがコールされます。computeはイテレータを返します。

MapPartitionRDDクラスはcomputeを以下のような実装でオーバーライドしています。

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

firstParentとは、このRDDの親にあたるRDDです。filterから見ればmap実行時に作成されたRDD、mapから見ればparallelizeで作成されたRDDです。

fはMapPartitionRDD作成時に登録した関数で、最後の引数(イテレータ)に対して関数を適用します。この対象のイテレータは見ての通り親RDDiteratorメソッドにより作成されるものですが、内部では親RDDのcomputeが実行されます。

こうして、末端の子からその親をたどっていき、イテレータに対する処理のチェーンを作ることで、中間データを必要としない1つの処理としてまとめあげているようでした。

「詳解Apache Spark」刊行

技術評論社から「詳解Apache Spark」が刊行されました!

www.amazon.co.jp

今回、私は内容のレビューという形でこの本の出版に関わらせていただきました。

総評

Apache Sparkについてなかなか網羅的にまとめられている良本です。Sparkは情報が出てきているとはいえ、まだまだ英語ソースのものが多かったりします。また、バージョンアップのたびに様々な新機能の導入を行っており、情報が古くなりがちです。本書は1.6.0を対象に執筆されており、現時点の最新は1.6.1なのでほぼ最新の内容といえます。

Sparkの導入を考えているエンジニアはとりあえず手元に置いておいても損ないのではないでしょうか。

章立ては以下です。

  • 第1章 Sparkの基礎
  • 第2章 Sparkの導入
  • 第3章 RDD
  • 第4章 クラスタ構築と運用
  • 第5章 DataFrameとSpark SQL
  • 第6章 Spark Streaming
  • 第7章 MLlib
  • 第8章 GraphX
  • Appendix YARN

RDDのようなcoreの部分はもとより、SparkSQL, Spark Streaming, MLlib, GraphXといった主要コンポーネントすべてを網羅しています。さらに、それぞれについてサラッとではなく、基礎から応用までしっかりと説明されているのがとても良いです。

個人的に良かった章

4章「クラスタ構築と運用」

YARNを利用した場合、ジョブがどのようなオブジェクトを介して、Executorで実行されるのかが図解されているのがためになります。DAGSchedulerを介してCoarseGrainedExecutorBackendを通り、最終的にExecutorのスレッドプール上でタスクが起動される、のような。

5章「DataFrameとSpark SQL

DataFrameのAPIについて詳しく解説されておりためになりました。DataFrameのAPIについては公式ドキュメントでも最低限のことしか書かれておらず、実際に使おうとするときに困ってしまい、結局ソースコードのdocsやコードそのものを見るハメになってしまいます。

本章で紹介されているDataFrameNaFunctionsとかとても便利なんですが、普通にドキュメント読んだだけではその存在を知りえませんw

8章「MLlib」

spark.mllibと、新しいAPIであるspark.mlの両方を解説しています。また、TF-IDFやOneHotEncoderなど、特徴抽出や変換のアルゴリズムについても丁寧に解説されているのが勉強になります。ケーススタディ複数あるのも理解を助けますね。

締め

Spark SQLやMLlibを仕事で使っている私のようなある程度使っているユーザーにとってもためになる内容が多かったです。是非!

低レベルな技術も大事ですよね。

昔お世話になった方が書いていたので読んでみました。

www.wantedly.com

感想を適当に。

低レベルな技術を身につけよう

マネージドサービスや、高レベルなAPIを備えたミドルウェアに囲まれて生きている我々には耳の痛い話です。

全員がLinuxカーネルについて深い知識を得る必要があるとは思いませんが、結局何を使うにも、真面目に使うなら中身の仕組みをある程度知らないと運用していけないですよね。

Sparkなんかは高レベルなAPIで簡単に大規模分散処理ができそうな雰囲気を出してきますが、ある程度中身の仕組みを知っていないとチューニングも運用もできません。

MySQLなんかもそうですよね。

早く、多く失敗しよう

そうですね。やりたいことはどんどん手を上げてやるべきだと思います。

しかしながら、やりたいことを思いつくにはある程度のインプットがないとできないように感じます。 例えば「Docker導入したい!」と思い至るには、Dockerについての知識や事例、現在のプロダクトが抱えている問題に対する理解がないと思い至らないですよね。

ということで、広く浅く色々なものを貪ることも大事かなと思いました。

もっとも大切なことに集中しよう

B to Cな会社で働いたことがないのでよくわかりませんが、この辺はとても苦労してそうです。

優先順位付けとなにをやらないかの交渉は必須ですよね。

SparkのShuffleについて調べてみる (1:概要とLogical Plan)

Sparkでは、reduceByKeyやgroupByKeyのような特定の処理を行う場合、シャッフルと呼ばれる処理が実行されます。 これの詳細について自分なりにある程度詳細な理解がしたかったのでまとめてみます。

Sparkのバージョンは1.6.1です。

SparkでのShuffleの概要

SparkにおけるShuffleは、あるRDDに対して操作を行う際にデータの再分散を行うことといえます。

例えば、mapは各RDDに対してmap処理を行うことで事足ります。この時、データの移動は必要ないのでShuffleは発生しません。(図の青い箱がRDD、丸い箱がパーティションです)

f:id:Attsun_1031:20160325001132p:plain

一方、reduceByKeyでは、指定されたキーでの集計が必要となるため、クラスタ上に散らばっている各RDDをキーごとにまとめる必要があり、この時にネットワーク越しのデータ転送、つまりShuffleが行なわれます。

f:id:Attsun_1031:20160325001649p:plain

Shuffleはネットワーク越しのデータ転送が行なわれますので、特に巨大なデータに対してShuffleをする場合は大量のディスクIOやネットワークIOが発生することになるので、パフォーマンスに影響を与えやすいです。

Sparkアプリケーションの実行フロー

Sparkアプリケーションはどのようにして実行されるのでしょう?大まかに分けると、3つのフェーズから成り立ちます。

  1. プログラムから、RDDの依存グラフ(DAG)を作成(Logical Plan)
  2. 依存グラフをステージやタスクといった単位に分解(Physical Plan)
  3. Physical Planに沿って実行

この各フェーズについて、Shuffleがどのように扱われているかを見ていきたいと思います。

Logical Planから見たShuffle

まず、Logical PlanでのShuffleについて見てみましょう。

Logical Planでは、RDDの変換処理からDAGを生成します。例えばmap処理の場合、先ほどの最初の図のようなグラフが作られます。

f:id:Attsun_1031:20160325001132p:plain

一方、Shuffleが伴う処理がおこなれた場合、2つ目の図のようなグラフが作られます。

f:id:Attsun_1031:20160325001649p:plain

この時、SparkではDAGの他に、RDD間にどのような依存関係があるかという情報を保持しています。(RDDクラスのdependenciesメソッドにより取得できます)

依存関係には大きく分けて2種類あります。

NarrowDependency

子のRDDの各パーティションが、親のRDDのどれかのパーティション全てに依存している

ShuffleDependency

子のRDDの各パーティションが、親のRDDの複数のパーティションの一部に依存している

名前から見てわかる通り、Shuffleを伴う操作がある場合、その親との依存関係はShuffleDependencyとなります。

「一部に依存している」とは、例えばreduceByKeyのようにキーごとに集計する場合は同じキーのデータが同じパーティションに入るようにする必要があるので、親の各パーティションからそのキーのデータのみ(つまり一部)を抜き出して新しいパーティションに引っ張ってくる、ということを意味します。

以下のリンクの右下の図が、ShuffleDependencyです。図にするとわかりやすいですね。

https://github.com/JerryLead/SparkInternals/blob/master/PNGfigures/Dependency.png

ShuffleDependencyは、Shuffleされたデータをどのように集約するかやmap side combine(データ転送の前にローカルのデータのみで集計を済ませておくことでネットワーク転送量を減らす仕掛け)を行うか、といった情報を保持しています。

依存関係が構築される例

例えば、map処理は(NarrowDependencyの子クラスである)OneToOneDepenencyを持ちます。

scala> sc.parallelize(Array(1,2,3)).map(_ + 1).dependencies
res19: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@6a11e139)

一方、reduceByKeyにより生成されるRDDはShuffleDependencyを持ちます。

scala> sc.parallelize(Array(("x", 1), ("y", 2), ("x", 3))).reduceByKey(_ + _).dependencies
res17: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1a47ddfb)

mapのように必ずどの依存関係を持つか決まっているものもありますが、例えばjoinはpartitionerが同じならNarrowDependencyになりますし、そうでないならShuffleDependencyになります。これは、もし2つのRDDが同じようにパーティショニングされているならデータを再分散する必要がないからです。

Spark1.6 - Datasetの導入とメモリ管理の変更

spark1.6がリリースされました。

Spark 1.6.0 released | Apache Spark

詳しい変更点は上記を見ていただくとして、ここではDatasetとメモリ管理方式の変更について紹介します。

メモリ管理の変更

Spark1.6ではメモリ(ヒープ)の管理が柔軟になり、より効率的にメモリを使用できるようになりました。

1.5までの世界

Sparkではメモリの使用領域が主に2つの領域に分かれています。

ストレージ領域

RDDをメモリにキャッシュする際に使われる領域です。

実行領域

シャッフルなど、タスクの実行に使われる領域です。

従来、この2つの領域は厳密に分断されていたため、チューニングを誤ると全体としてはメモリが余っているにもかかわらず、無駄なディスク書き込みが発生するなどしてしまいました。

1.6からの世界

1.6ではこの管理方式に変更が加えられ、お互いがお互いの領域を使用することが可能になりました。

また、ストレージ領域がspark.memory.storageFractionを超えてメモリを使用している場合、実行領域はストレージ領域の使用メモリがspark.memory.storageFractionを上回っている限り横取りすることができるそうです。(逆は不可能)

重要な設定値

spark.memory.fraction

JVMのうち、Sparkが使用できるメモリの割合を示す。

spark.memory.storageFraction

spark.memory.fractionのうち、ストレージ領域に最低どれだけ確保できるかを示す。

spark.memory.useLegacyMode

trueに設定することで、1.5以前の古いメモリ管理方法を採用できる。

Dataset API

1.6で初めて追加された試験的な機能です。pythonは未サポート。

Datasetを使うことで、素のRDDがもたらす静的な型チェックと、DataFrameがもたらす実行の最適化の両方の恩恵が受けられます。

型付けについて詳しくみてみましょう。

静的な型チェック

静的な型チェックが得られるとはどういうことでしょうか?このメリットを理解するには、まずDataFrameの問題点を理解する必要があります。

DataFrameの例

例えば、名前(String)と年齢(Long)の2つのフィールドを持つPersonスキーマを外部から読み込むとしましょう。(このjsonは、sparkのexapmleディレクトリに含まれているものです)

scala> case class Person(name: String, age: Long)
scala> val path = "examples/src/main/resources/people.json"
scala> val people: DataFrame = sqlContext.read.json(path)

次は、このpeopleからage列のみを取り出すメソッドを定義したいとします。

scala> def collectAge(d: DataFrame): DataFrame = d.select($"age")

一見問題ないように見えますが、実はここに危険があります。

まず、戻ってくる型がDataFrameという非常に抽象的な型であるため、実装をよく見ないと実際どのようなスキーマDataFrameが戻ってきているかわからず、当然静的な型チェックも効きません。

例えば以下のような定義はすべてコンパイルを通過します。

// ageではなくnameを返してしまっているが、コンパイルできる
scala> def collectAge(d: DataFrame): DataFrame = d.select($"name")
 
// 存在しないフィールドを得るメソッドもコンパイルは通過する
scala> def collectAge(d: DataFrame): DataFrame = d.select($"xxx")
// 実行時にエラーとなる。
scala> collectAge(people)
org.apache.spark.sql.AnalysisException: cannot resolve 'xxx' given input columns age, name;

DataFrameに対する操作が複雑になればなるほど、今扱っているDataFrameが一体どのようなものなのか、正しくスキーマを守れているのか、といったことがわかなくなります。

Datasetの例

Datasetはこれを解決します。

まずはDatasetを作ってみましょう。DataFrame.asによりDatasetに変換できます。

scala> val people: Dataset[Person] = sqlContext.read.json(path).as[Person]

次に、DataFrameの例のときと同様にageを取り出す関数の定義です。

scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.age)

見てお分かりだと思いますが、戻り値のDatasetがどのような型を持っているか明示されています。

これにより、DataFrameの時に問題になった間違ったメソッド定義はすべてコンパイル時にチェックされます。

// 戻り値の型と一致しないのでエラー
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name)
<console>:32: error: type mismatch;
 found   : String
 required: Long
         def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name)
 
// Personに定義されていないフィールドを参照しようとしているのでエラー
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)
<console>:32: error: value xxx is not a member of Person
         def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)

これにより、SparkSQLの最適化の恩恵を受けながら、静的な方チェックの恩恵を受けることにより、安全にプログラムを書くことが可能になりました。めでたしめでたし。

Scala占い

あけましておめでとうございます。 恒例の占い企画です。 2013年はPython、2014年はHaskell、2015年はGoで占いをしました。今年はScalaです。

scala> import scala.util.Random
import scala.util.Random

scala> Random.shuffle(List("大凶", "凶", "大吉", "中吉", "小吉", "吉", "マジ吉")).head
res0: String = 大吉

素晴らしい!!

今年もよろしくお願いします。

Kafkaのパーティンション数を決める時に考慮すること

kafkaのパーティション数を選択する際に考慮する点について、kafka作者のブログにまとめられていました。

blog.confluent.io

自身の理解のために、ちょっとまとめてみようと思います。

※この記事はあくまでも個人的なまとめですので、作者の原文に一度目を通すことをオススメします。

概要

考慮する点は、ざっくりまとめると以下があります。

必要とされるスループット

パーティションを追加することは、メッセージのproducerとconsumerの並列化につながるのでスループット向上が期待できます。

なら、max(t/p, t/c)パーティションを設定する必要があります。

将来的に必要となりそうなパーティション

設定しようとしているパーティション数が現在のスループットを満たすとしても、近い将来増加するだろうなという見込み何ある場合は、多めに設定しましょう。

なぜなら、パーティションをあとで増やすと、あるキーに割り振られるパーティションが変わってしまうためです。

ここからは自分の理解ですが、例えば、key=xというメッセージと、その次にkey=yというメッセージを受け、HashPartitionerのもとではどちらもpartition=1に割り振られていたとします。

この後、パーティション数を増やすと、xが割り当てられるパーティションの値が変わってしまい、key=xのメッセージを再度受けると別のパーティションに割り当てられてしまうため、key=xの次にkey=yが来る、というパーティション内で保障された順序同一性が崩れてしまう、ということでしょう。

kafkaの管理するファイルディスクリプタの数

kafkaは、パーティション1つにつきファイルを2つオープンする(データファイル、インデックスファイル)ので、osのファイルディスクリプタ上限を超えないように注意しましょう。

ノード障害からの復旧時間

kafkaのbrokerが停止した場合、そのbrokerがリーダーとなっていたパーティションは別のサーバーがリーダーとして振舞うようになるのですが、brokerの停止の仕方によって動きが異なります。

graceful shutdown

graceful shutdown(おそらくkafka-server-stop.shによる停止のこと)した場合、パーティション移動は一つずつ行われるので数msのダウンタイムで済む。また、ログをディスクに一旦書き出すことで再起動時のログリカバリプロセスを省くことができ、再起動を素早くできるメリットもあると公式ドキュメントにあります。

hard kill (kill -9)

強制終了した場合、停止したbrokerのパーティションはすべて同時にダウンしてしまいます。1パーティションのリーダー移動に5ms程度かかるので、1000パーティションあった場合は5秒のダウンタイムとなってしまいます。

更に停止したbrokerがcontroller(パーティションリーダーの管理を司る特別なbroker)だった場合、新しいcontrollerがZookeeperからメタデータを取得するために、2ms * パーティション数の時間が余計にかかります。

レプリケーションに伴う負荷

Kafkaはコミット(すべてのレプリケーションbrokerにメッセージが保存されること)されたメッセージのみconsumerに見せるのですが、レプリケーションはシングルスレッドで行われるので、1台のbrokerのレプリケーション数が多いほどレイテンシに影響します。

例えば、2台構成で2000パーティションある場合、片方は1000パーティションレプリケーションを行うので、結果としてだいたい20msスループットに影響します。これはサーバー台数を増やして1台あたりのレプリケーション数を削減することで軽減できます。

とはいえ、1メッセージは1パーティションにしか属さないので、1メッセージを受けた後に実行されるレプリケーションも1台あたり1回と考えられるため、レプリケーションよりも速いペースでメッセージが来ない限りは詰まらないんではとも思いました。

作者によると、パーティション数は100 * broker数 * レプリケーション数に収めるのが良いそうです。

Producerのバッファリングメッセージ

producerは毎回メッセージをbrokerに送るのではなくある程度バッファリングしてから送るのですが、このバッファリングはパーティションごとに行われるため、パーティション数を増加させるとproducerが全体でバッファリングするメッセージも増えることになります。

もし、事前に設定されたバッファリングに使えるメモリをオーバーしてしまった場合、新規の受信をブロックするかメッセージを落とすことになってしまいます。

これに関係のありそうなproducerのパラメータは以下。

  • buffer.memory:producerがバッファリングに使えるメモリ
  • block.on.buffer.full:trueの場合、バッファリングするメッセージ量がbuffer.memoryを超えたら受信をブロックする。falseなら例外投げる。
  • queue.buffering.max.messages:バッファに積める最大メッセージ量。

Sum up

いろいろ調べながら書いたので結構勉強なりました。Kafkaのトピック設計はなかなか奥が深そうですね。