Spark1.6 - Datasetの導入とメモリ管理の変更
spark1.6がリリースされました。
Spark 1.6.0 released | Apache Spark
詳しい変更点は上記を見ていただくとして、ここではDatasetとメモリ管理方式の変更について紹介します。
メモリ管理の変更
Spark1.6ではメモリ(ヒープ)の管理が柔軟になり、より効率的にメモリを使用できるようになりました。
1.5までの世界
Sparkではメモリの使用領域が主に2つの領域に分かれています。
ストレージ領域
RDDをメモリにキャッシュする際に使われる領域です。
実行領域
シャッフルなど、タスクの実行に使われる領域です。
従来、この2つの領域は厳密に分断されていたため、チューニングを誤ると全体としてはメモリが余っているにもかかわらず、無駄なディスク書き込みが発生するなどしてしまいました。
1.6からの世界
1.6ではこの管理方式に変更が加えられ、お互いがお互いの領域を使用することが可能になりました。
また、ストレージ領域がspark.memory.storageFraction
を超えてメモリを使用している場合、実行領域はストレージ領域の使用メモリがspark.memory.storageFraction
を上回っている限り横取りすることができるそうです。(逆は不可能)
重要な設定値
spark.memory.fraction
JVMのうち、Sparkが使用できるメモリの割合を示す。
spark.memory.storageFraction
spark.memory.fraction
のうち、ストレージ領域に最低どれだけ確保できるかを示す。
spark.memory.useLegacyMode
trueに設定することで、1.5以前の古いメモリ管理方法を採用できる。
Dataset API
1.6で初めて追加された試験的な機能です。pythonは未サポート。
Dataset
を使うことで、素のRDD
がもたらす静的な型チェックと、DataFrame
がもたらす実行の最適化の両方の恩恵が受けられます。
型付けについて詳しくみてみましょう。
静的な型チェック
静的な型チェックが得られるとはどういうことでしょうか?このメリットを理解するには、まずDataFrame
の問題点を理解する必要があります。
DataFrameの例
例えば、名前(String)と年齢(Long)の2つのフィールドを持つPerson
スキーマを外部から読み込むとしましょう。(このjsonは、sparkのexapmleディレクトリに含まれているものです)
scala> case class Person(name: String, age: Long) scala> val path = "examples/src/main/resources/people.json" scala> val people: DataFrame = sqlContext.read.json(path)
次は、このpeople
からage列のみを取り出すメソッドを定義したいとします。
scala> def collectAge(d: DataFrame): DataFrame = d.select($"age")
一見問題ないように見えますが、実はここに危険があります。
まず、戻ってくる型がDataFrame
という非常に抽象的な型であるため、実装をよく見ないと実際どのようなスキーマのDataFrame
が戻ってきているかわからず、当然静的な型チェックも効きません。
例えば以下のような定義はすべてコンパイルを通過します。
// ageではなくnameを返してしまっているが、コンパイルできる scala> def collectAge(d: DataFrame): DataFrame = d.select($"name") // 存在しないフィールドを得るメソッドもコンパイルは通過する scala> def collectAge(d: DataFrame): DataFrame = d.select($"xxx") // 実行時にエラーとなる。 scala> collectAge(people) org.apache.spark.sql.AnalysisException: cannot resolve 'xxx' given input columns age, name;
DataFrame
に対する操作が複雑になればなるほど、今扱っているDataFrame
が一体どのようなものなのか、正しくスキーマを守れているのか、といったことがわかなくなります。
Datasetの例
Dataset
はこれを解決します。
まずはDataset
を作ってみましょう。DataFrame.as
によりDataset
に変換できます。
scala> val people: Dataset[Person] = sqlContext.read.json(path).as[Person]
次に、DataFrame
の例のときと同様にageを取り出す関数の定義です。
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.age)
見てお分かりだと思いますが、戻り値のDataset
がどのような型を持っているか明示されています。
これにより、DataFrame
の時に問題になった間違ったメソッド定義はすべてコンパイル時にチェックされます。
// 戻り値の型と一致しないのでエラー scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name) <console>:32: error: type mismatch; found : String required: Long def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name) // Personに定義されていないフィールドを参照しようとしているのでエラー scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx) <console>:32: error: value xxx is not a member of Person def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)
これにより、SparkSQLの最適化の恩恵を受けながら、静的な方チェックの恩恵を受けることにより、安全にプログラムを書くことが可能になりました。めでたしめでたし。
Kafkaのパーティンション数を決める時に考慮すること
kafkaのパーティション数を選択する際に考慮する点について、kafka作者のブログにまとめられていました。
自身の理解のために、ちょっとまとめてみようと思います。
※この記事はあくまでも個人的なまとめですので、作者の原文に一度目を通すことをオススメします。
概要
考慮する点は、ざっくりまとめると以下があります。
必要とされるスループット
パーティションを追加することは、メッセージのproducerとconsumerの並列化につながるのでスループット向上が期待できます。
なら、max(t/p, t/c)
のパーティションを設定する必要があります。
将来的に必要となりそうなパーティション数
設定しようとしているパーティション数が現在のスループットを満たすとしても、近い将来増加するだろうなという見込み何ある場合は、多めに設定しましょう。
なぜなら、パーティションをあとで増やすと、あるキーに割り振られるパーティションが変わってしまうためです。
ここからは自分の理解ですが、例えば、key=xというメッセージと、その次にkey=yというメッセージを受け、HashPartitionerのもとではどちらもpartition=1に割り振られていたとします。
この後、パーティション数を増やすと、xが割り当てられるパーティションの値が変わってしまい、key=xのメッセージを再度受けると別のパーティションに割り当てられてしまうため、key=xの次にkey=yが来る、というパーティション内で保障された順序同一性が崩れてしまう、ということでしょう。
kafkaの管理するファイルディスクリプタの数
kafkaは、パーティション1つにつきファイルを2つオープンする(データファイル、インデックスファイル)ので、osのファイルディスクリプタ上限を超えないように注意しましょう。
ノード障害からの復旧時間
kafkaのbrokerが停止した場合、そのbrokerがリーダーとなっていたパーティションは別のサーバーがリーダーとして振舞うようになるのですが、brokerの停止の仕方によって動きが異なります。
graceful shutdown
graceful shutdown(おそらくkafka-server-stop.shによる停止のこと)した場合、パーティション移動は一つずつ行われるので数msのダウンタイムで済む。また、ログをディスクに一旦書き出すことで再起動時のログリカバリプロセスを省くことができ、再起動を素早くできるメリットもあると公式ドキュメントにあります。
hard kill (kill -9)
強制終了した場合、停止したbrokerのパーティションはすべて同時にダウンしてしまいます。1パーティションのリーダー移動に5ms程度かかるので、1000パーティションあった場合は5秒のダウンタイムとなってしまいます。
更に停止したbrokerがcontroller(パーティションリーダーの管理を司る特別なbroker)だった場合、新しいcontrollerがZookeeperからメタデータを取得するために、2ms * パーティション数の時間が余計にかかります。
レプリケーションに伴う負荷
Kafkaはコミット(すべてのレプリケーションbrokerにメッセージが保存されること)されたメッセージのみconsumerに見せるのですが、レプリケーションはシングルスレッドで行われるので、1台のbrokerのレプリケーション数が多いほどレイテンシに影響します。
例えば、2台構成で2000パーティションある場合、片方は1000パーティションのレプリケーションを行うので、結果としてだいたい20msスループットに影響します。これはサーバー台数を増やして1台あたりのレプリケーション数を削減することで軽減できます。
とはいえ、1メッセージは1パーティションにしか属さないので、1メッセージを受けた後に実行されるレプリケーションも1台あたり1回と考えられるため、レプリケーションよりも速いペースでメッセージが来ない限りは詰まらないんではとも思いました。
作者によると、パーティション数は100 * broker数 * レプリケーション数
に収めるのが良いそうです。
Producerのバッファリングメッセージ
producerは毎回メッセージをbrokerに送るのではなくある程度バッファリングしてから送るのですが、このバッファリングはパーティションごとに行われるため、パーティション数を増加させるとproducerが全体でバッファリングするメッセージも増えることになります。
もし、事前に設定されたバッファリングに使えるメモリをオーバーしてしまった場合、新規の受信をブロックするかメッセージを落とすことになってしまいます。
これに関係のありそうなproducerのパラメータは以下。
- buffer.memory:producerがバッファリングに使えるメモリ
- block.on.buffer.full:trueの場合、バッファリングするメッセージ量がbuffer.memoryを超えたら受信をブロックする。falseなら例外投げる。
- queue.buffering.max.messages:バッファに積める最大メッセージ量。
Sum up
いろいろ調べながら書いたので結構勉強なりました。Kafkaのトピック設計はなかなか奥が深そうですね。
spark-summit 2014のSpark Streamingのお題をやってみる
最近はSparkにお熱ということで、spark-summit 2014で催されたSpark StreamingでTwitterのタイムラインからハッシュタグの出現頻度を抽出する、というお題をやってみたいとおもいます。
spark-summit 2014のhands on資料
Stream Processing w/ Spark Streaming
ソースコード
spark-summit 2014 spark streaming hands on
以上。次はKafkaからデータ流してナントカカントカ、というのもやってみたい。
Go占い
あけましておめでとうございます。
一昨年はpython, 去年はHaskellだったので今年はGoで占いします。
以下の.goを作ります。
// lottery.go package main import ( "fmt" "math/rand" "time" ) func main() { rand.Seed(time.Now().Unix()) values := []string{"大凶", "凶", "大吉", "中吉", "小吉", "吉", "マジ吉"} fmt.Println(values[rand.Intn(len(values))]) }
実行してみます。
$ go run lottery.go 吉
まぁまぁですね!今年もよろしくお願いします。
PyconJP 2014に参加した
毎年恒例pyconに参加しました。
今年は例年に比べ機械学習や数理系の話題が多く、自分自身も業務で最近はこの辺に触ることが多かったので嬉しい内容でした。
内容は、1セッションの時間が30分ということもあり、使っているライブラリの紹介や入門内容的なものが多かったですが、それでもどんなライブラリを使っているか知れてよかったです。
機械学習系でよく使われていそうだったライブラリ
numpy
高速な行列計算が便利なライブラリ。
以下のプレゼンでは、1から1億までの数を合計するプログラムが、素のpythonと比べて80倍早かった(Cと同等)とのこと。
Effective numerical computation in num py and scipy
scipy
sparse matrixなど、便利なデータ構造がある。距離計算関数なども。
matplotlib
グラフ描画ライブラリ。APIがMATLABに似ているとのこと。プロジェクトは非常に成熟・安定しているとのこと。
scikit-learn
機械学習ライブラリ。機械学習系のライブラリは他にもいくつかありそうでしたが、これがデファクトという感じでした。
メリットは、
- 使い方が簡単
- 比較的高速
- メジャーなアルゴリズムはだいたい網羅している
- プロジェクトが活発
pandas
クロス集計など、行列計算を楽にするライブラリ。RのData.Frameに似ているらしい。scikit-learnと相性が悪い、という人も一人だけいた(詳しいことはわかりませんでしたが)。
anaconda
これらデータ分析に必要なライブラリを一挙にインストールするためのパッケージ。numpyやscipyのコンパイルは大変なので(何かしらハマりどころが多い)、これを使うと楽らしい。
標準モジュール
itertools
permutationやgroupbyなど、便利なモジュール多いです。
collections
defaultdictには毎日のようにお世話になってます。Counterとかも便利だそうです。
statistics
python3.4から新しく入ったライブラリ。平均や標準偏差などの統計量を計算するライブラリです。
iPython Notebook
プレゼンのツールとして, iPython Notebookを使っている人が多かったです。グラフとコードを一体にして見せれるので、分析系の人たちには重宝されていそう。
その他面白かったもの
Deep Learningと画像認識
「Deep learning for Image Recognition in Python」というタイトルでHideki Tanakaさんの発表でした。
Deep Learning for Image Recognition in Python
機械学習の精度を競うKaagleというウェブサイトがあり、そこで犬と猫の画像を自動判定する課題に挑戦した。
- 最初は、画像認識のための一般的な特徴量であるHaar-like featuresを使った。
- 60%くらいの精度しか出ず。しかしながら、Kaagleでのトップランナーは99%近い精度を出している。
- 最初は、画像認識のための一般的な特徴量であるHaar-like featuresを使った。
トップの連中はどうやらDeep learningという手法を用いているらしい。
- Deep Learning (Pre-Trained Networks)を使ってみたところ95%まで急上昇した。
後半に公開された会場限定のスライドが非常に面白く、初日のキーノートと並んで最も面白く、記憶に残る楽しいプレゼンでした。
Oktavia
Oktaviaという全文検索エンジンのプレゼンがありました。
従来の転置インデックスではなく、「高速文字列解析の世界」にあったFM-Indexというアルゴリズムを採用したという話でした。
- 検索エンジンのインデックスとしては、一般的には転置インデックスが用いられる。
日本語や中国語などアジア系の言語は単純に単語に分割できない。
そこで、FM-Index
- 「高速文字列解析の世界」で紹介されていた。
- 検索前に単語を分割する必要がない。
- 圧縮インデックスファイルを使った検索アルゴリズムの中でも最速。
- インデックスファイルからオリジナルのドキュメントを復元できる。
- 参考
キーノート(1日目)
スピーカーはreqeustsの作者であるKenneth Reitzさん。
python2, python3のコミュニティの分断について語られました。
個人的には、python2はこれ以上進化しないとわかっているわけだし、ライブラリのpython3対応もかなり進んでいるので、あとは趣味レベルを越えたビジネスでの事例が出てくれば自然に移行していくんじゃないかなと楽観的に見てます。
事例が少ないだけに、今python3を採用すれば先進的なブランドイメージが得られるかもしれないですね。
以上。運営の皆様、お疲れ様でした!
haskellの勉強として、twitter apiライブラリを書いてみた
すごいH本をひと通り読んだので次は自分でコードを書いてみようと思い、Twitter APIのライブラリを適当な感じで書いてみました。
Attsun1031/birdwatcher · GitHub
使い方
import Web.Birdwatcher main = do user <- users_show [("screen_name", "__Attsun__")] putStrLn $ show user
感想
- モナドむずい
- Aesonが便利(型クラスの仕組みのおかげ?)
- ライブラリコードを読んだりcabalを設定したりするのに時間を多く使ってしまったので、使った時間の割にはコード量が少なかった。
レコードのアクセサ関数で困った
レコードとは以下の様な構造体のようなもの。
data Point = Pt {x :: Integer, y :: Integer}
レコードを定義すると、フィールドへのアクセサ関数x, yが自動的に定義されるわけだけど、アクセサ関数の名前がだぶってしまうので同じ名前のフィールド名を持つレコードを同じ名前空間に複数おけない。
例えば自分の作ったライブラリの場合、UsersShowとUserTimelineのどちらにもidという名前があるため同じ名前空間におけない。とりあえずmoduleを分けるという手で対処しているが、分けたくないってケースもあるだろうし、どうやってやりくりするんだろ?