The Dabsong Conshirtoe

技術系の話を主にします。

Spark1.6 - Datasetの導入とメモリ管理の変更

spark1.6がリリースされました。

Spark 1.6.0 released | Apache Spark

詳しい変更点は上記を見ていただくとして、ここではDatasetとメモリ管理方式の変更について紹介します。

メモリ管理の変更

Spark1.6ではメモリ(ヒープ)の管理が柔軟になり、より効率的にメモリを使用できるようになりました。

1.5までの世界

Sparkではメモリの使用領域が主に2つの領域に分かれています。

ストレージ領域

RDDをメモリにキャッシュする際に使われる領域です。

実行領域

シャッフルなど、タスクの実行に使われる領域です。

従来、この2つの領域は厳密に分断されていたため、チューニングを誤ると全体としてはメモリが余っているにもかかわらず、無駄なディスク書き込みが発生するなどしてしまいました。

1.6からの世界

1.6ではこの管理方式に変更が加えられ、お互いがお互いの領域を使用することが可能になりました。

また、ストレージ領域がspark.memory.storageFractionを超えてメモリを使用している場合、実行領域はストレージ領域の使用メモリがspark.memory.storageFractionを上回っている限り横取りすることができるそうです。(逆は不可能)

重要な設定値

spark.memory.fraction

JVMのうち、Sparkが使用できるメモリの割合を示す。

spark.memory.storageFraction

spark.memory.fractionのうち、ストレージ領域に最低どれだけ確保できるかを示す。

spark.memory.useLegacyMode

trueに設定することで、1.5以前の古いメモリ管理方法を採用できる。

Dataset API

1.6で初めて追加された試験的な機能です。pythonは未サポート。

Datasetを使うことで、素のRDDがもたらす静的な型チェックと、DataFrameがもたらす実行の最適化の両方の恩恵が受けられます。

型付けについて詳しくみてみましょう。

静的な型チェック

静的な型チェックが得られるとはどういうことでしょうか?このメリットを理解するには、まずDataFrameの問題点を理解する必要があります。

DataFrameの例

例えば、名前(String)と年齢(Long)の2つのフィールドを持つPersonスキーマを外部から読み込むとしましょう。(このjsonは、sparkのexapmleディレクトリに含まれているものです)

scala> case class Person(name: String, age: Long)
scala> val path = "examples/src/main/resources/people.json"
scala> val people: DataFrame = sqlContext.read.json(path)

次は、このpeopleからage列のみを取り出すメソッドを定義したいとします。

scala> def collectAge(d: DataFrame): DataFrame = d.select($"age")

一見問題ないように見えますが、実はここに危険があります。

まず、戻ってくる型がDataFrameという非常に抽象的な型であるため、実装をよく見ないと実際どのようなスキーマDataFrameが戻ってきているかわからず、当然静的な型チェックも効きません。

例えば以下のような定義はすべてコンパイルを通過します。

// ageではなくnameを返してしまっているが、コンパイルできる
scala> def collectAge(d: DataFrame): DataFrame = d.select($"name")
 
// 存在しないフィールドを得るメソッドもコンパイルは通過する
scala> def collectAge(d: DataFrame): DataFrame = d.select($"xxx")
// 実行時にエラーとなる。
scala> collectAge(people)
org.apache.spark.sql.AnalysisException: cannot resolve 'xxx' given input columns age, name;

DataFrameに対する操作が複雑になればなるほど、今扱っているDataFrameが一体どのようなものなのか、正しくスキーマを守れているのか、といったことがわかなくなります。

Datasetの例

Datasetはこれを解決します。

まずはDatasetを作ってみましょう。DataFrame.asによりDatasetに変換できます。

scala> val people: Dataset[Person] = sqlContext.read.json(path).as[Person]

次に、DataFrameの例のときと同様にageを取り出す関数の定義です。

scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.age)

見てお分かりだと思いますが、戻り値のDatasetがどのような型を持っているか明示されています。

これにより、DataFrameの時に問題になった間違ったメソッド定義はすべてコンパイル時にチェックされます。

// 戻り値の型と一致しないのでエラー
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name)
<console>:32: error: type mismatch;
 found   : String
 required: Long
         def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.name)
 
// Personに定義されていないフィールドを参照しようとしているのでエラー
scala> def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)
<console>:32: error: value xxx is not a member of Person
         def collectAge(d: Dataset[Person]): Dataset[Long] = d.map(_.xxx)

これにより、SparkSQLの最適化の恩恵を受けながら、静的な方チェックの恩恵を受けることにより、安全にプログラムを書くことが可能になりました。めでたしめでたし。

Scala占い

あけましておめでとうございます。 恒例の占い企画です。 2013年はPython、2014年はHaskell、2015年はGoで占いをしました。今年はScalaです。

scala> import scala.util.Random
import scala.util.Random

scala> Random.shuffle(List("大凶", "凶", "大吉", "中吉", "小吉", "吉", "マジ吉")).head
res0: String = 大吉

素晴らしい!!

今年もよろしくお願いします。

Kafkaのパーティンション数を決める時に考慮すること

kafkaのパーティション数を選択する際に考慮する点について、kafka作者のブログにまとめられていました。

blog.confluent.io

自身の理解のために、ちょっとまとめてみようと思います。

※この記事はあくまでも個人的なまとめですので、作者の原文に一度目を通すことをオススメします。

概要

考慮する点は、ざっくりまとめると以下があります。

必要とされるスループット

パーティションを追加することは、メッセージのproducerとconsumerの並列化につながるのでスループット向上が期待できます。

なら、max(t/p, t/c)パーティションを設定する必要があります。

将来的に必要となりそうなパーティション

設定しようとしているパーティション数が現在のスループットを満たすとしても、近い将来増加するだろうなという見込み何ある場合は、多めに設定しましょう。

なぜなら、パーティションをあとで増やすと、あるキーに割り振られるパーティションが変わってしまうためです。

ここからは自分の理解ですが、例えば、key=xというメッセージと、その次にkey=yというメッセージを受け、HashPartitionerのもとではどちらもpartition=1に割り振られていたとします。

この後、パーティション数を増やすと、xが割り当てられるパーティションの値が変わってしまい、key=xのメッセージを再度受けると別のパーティションに割り当てられてしまうため、key=xの次にkey=yが来る、というパーティション内で保障された順序同一性が崩れてしまう、ということでしょう。

kafkaの管理するファイルディスクリプタの数

kafkaは、パーティション1つにつきファイルを2つオープンする(データファイル、インデックスファイル)ので、osのファイルディスクリプタ上限を超えないように注意しましょう。

ノード障害からの復旧時間

kafkaのbrokerが停止した場合、そのbrokerがリーダーとなっていたパーティションは別のサーバーがリーダーとして振舞うようになるのですが、brokerの停止の仕方によって動きが異なります。

graceful shutdown

graceful shutdown(おそらくkafka-server-stop.shによる停止のこと)した場合、パーティション移動は一つずつ行われるので数msのダウンタイムで済む。また、ログをディスクに一旦書き出すことで再起動時のログリカバリプロセスを省くことができ、再起動を素早くできるメリットもあると公式ドキュメントにあります。

hard kill (kill -9)

強制終了した場合、停止したbrokerのパーティションはすべて同時にダウンしてしまいます。1パーティションのリーダー移動に5ms程度かかるので、1000パーティションあった場合は5秒のダウンタイムとなってしまいます。

更に停止したbrokerがcontroller(パーティションリーダーの管理を司る特別なbroker)だった場合、新しいcontrollerがZookeeperからメタデータを取得するために、2ms * パーティション数の時間が余計にかかります。

レプリケーションに伴う負荷

Kafkaはコミット(すべてのレプリケーションbrokerにメッセージが保存されること)されたメッセージのみconsumerに見せるのですが、レプリケーションはシングルスレッドで行われるので、1台のbrokerのレプリケーション数が多いほどレイテンシに影響します。

例えば、2台構成で2000パーティションある場合、片方は1000パーティションレプリケーションを行うので、結果としてだいたい20msスループットに影響します。これはサーバー台数を増やして1台あたりのレプリケーション数を削減することで軽減できます。

とはいえ、1メッセージは1パーティションにしか属さないので、1メッセージを受けた後に実行されるレプリケーションも1台あたり1回と考えられるため、レプリケーションよりも速いペースでメッセージが来ない限りは詰まらないんではとも思いました。

作者によると、パーティション数は100 * broker数 * レプリケーション数に収めるのが良いそうです。

Producerのバッファリングメッセージ

producerは毎回メッセージをbrokerに送るのではなくある程度バッファリングしてから送るのですが、このバッファリングはパーティションごとに行われるため、パーティション数を増加させるとproducerが全体でバッファリングするメッセージも増えることになります。

もし、事前に設定されたバッファリングに使えるメモリをオーバーしてしまった場合、新規の受信をブロックするかメッセージを落とすことになってしまいます。

これに関係のありそうなproducerのパラメータは以下。

  • buffer.memory:producerがバッファリングに使えるメモリ
  • block.on.buffer.full:trueの場合、バッファリングするメッセージ量がbuffer.memoryを超えたら受信をブロックする。falseなら例外投げる。
  • queue.buffering.max.messages:バッファに積める最大メッセージ量。

Sum up

いろいろ調べながら書いたので結構勉強なりました。Kafkaのトピック設計はなかなか奥が深そうですね。

spark-summit 2014のSpark Streamingのお題をやってみる

最近はSparkにお熱ということで、spark-summit 2014で催されたSpark StreamingでTwitterのタイムラインからハッシュタグの出現頻度を抽出する、というお題をやってみたいとおもいます。

spark-summit 2014のhands on資料

Stream Processing w/ Spark Streaming

ソースコード

spark-summit 2014 spark streaming hands on

以上。次はKafkaからデータ流してナントカカントカ、というのもやってみたい。

Go占い

あけましておめでとうございます。

一昨年はpython, 去年はHaskellだったので今年はGoで占いします。

以下の.goを作ります。

// lottery.go
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	rand.Seed(time.Now().Unix())
	values := []string{"大凶", "凶", "大吉", "中吉", "小吉", "吉", "マジ吉"}
	fmt.Println(values[rand.Intn(len(values))])
}

実行してみます。

$ go run lottery.go
吉

まぁまぁですね!今年もよろしくお願いします。

PyconJP 2014に参加した

毎年恒例pyconに参加しました。

今年は例年に比べ機械学習や数理系の話題が多く、自分自身も業務で最近はこの辺に触ることが多かったので嬉しい内容でした。

内容は、1セッションの時間が30分ということもあり、使っているライブラリの紹介や入門内容的なものが多かったですが、それでもどんなライブラリを使っているか知れてよかったです。

機械学習系でよく使われていそうだったライブラリ

numpy

高速な行列計算が便利なライブラリ。

以下のプレゼンでは、1から1億までの数を合計するプログラムが、素のpythonと比べて80倍早かった(Cと同等)とのこと。

Effective numerical computation in num py and scipy

scipy

sparse matrixなど、便利なデータ構造がある。距離計算関数なども。

matplotlib

グラフ描画ライブラリ。APIMATLABに似ているとのこと。プロジェクトは非常に成熟・安定しているとのこと。

scikit-learn

機械学習ライブラリ。機械学習系のライブラリは他にもいくつかありそうでしたが、これがデファクトという感じでした。

メリットは、

  • 使い方が簡単
  • 比較的高速
  • メジャーなアルゴリズムはだいたい網羅している
  • プロジェクトが活発

pandas

クロス集計など、行列計算を楽にするライブラリ。RのData.Frameに似ているらしい。scikit-learnと相性が悪い、という人も一人だけいた(詳しいことはわかりませんでしたが)。

anaconda

これらデータ分析に必要なライブラリを一挙にインストールするためのパッケージ。numpyやscipyのコンパイルは大変なので(何かしらハマりどころが多い)、これを使うと楽らしい。

標準モジュール

itertools

permutationやgroupbyなど、便利なモジュール多いです。

collections

defaultdictには毎日のようにお世話になってます。Counterとかも便利だそうです。

statistics

python3.4から新しく入ったライブラリ。平均や標準偏差などの統計量を計算するライブラリです。

iPython Notebook

プレゼンのツールとして, iPython Notebookを使っている人が多かったです。グラフとコードを一体にして見せれるので、分析系の人たちには重宝されていそう。

その他面白かったもの

Deep Learningと画像認識

「Deep learning for Image Recognition in Python」というタイトルでHideki Tanakaさんの発表でした。

Deep Learning for Image Recognition in Python

  • 機械学習の精度を競うKaagleというウェブサイトがあり、そこで犬と猫の画像を自動判定する課題に挑戦した。

    • 最初は、画像認識のための一般的な特徴量であるHaar-like featuresを使った。
      • 60%くらいの精度しか出ず。しかしながら、Kaagleでのトップランナーは99%近い精度を出している。
  • トップの連中はどうやらDeep learningという手法を用いているらしい。

    • Deep Learning (Pre-Trained Networks)を使ってみたところ95%まで急上昇した。

後半に公開された会場限定のスライドが非常に面白く、初日のキーノートと並んで最も面白く、記憶に残る楽しいプレゼンでした。

Oktavia

Oktaviaという全文検索エンジンのプレゼンがありました。

従来の転置インデックスではなく、「高速文字列解析の世界」にあったFM-Indexというアルゴリズムを採用したという話でした。

  • 検索エンジンのインデックスとしては、一般的には転置インデックスが用いられる。
    • 日本語や中国語などアジア系の言語は単純に単語に分割できない。

      • 形態素解析を用いて分割し、転置インデックスを作る
        • 各言語に精通している必要がある。
        • 巨大な辞書が必要。
        • 検索ワードに対しても、同様の処理(形態素解析)を行う必要がある。
      • N-gram
        • インデックスファイルが巨大になりやすい。
    • そこで、FM-Index

      • 「高速文字列解析の世界」で紹介されていた。
      • 検索前に単語を分割する必要がない。
      • 圧縮インデックスファイルを使った検索アルゴリズムの中でも最速。
      • インデックスファイルからオリジナルのドキュメントを復元できる。
      • 参考

キーノート(1日目)

スピーカーはreqeustsの作者であるKenneth Reitzさん。

python2, python3のコミュニティの分断について語られました。

個人的には、python2はこれ以上進化しないとわかっているわけだし、ライブラリのpython3対応もかなり進んでいるので、あとは趣味レベルを越えたビジネスでの事例が出てくれば自然に移行していくんじゃないかなと楽観的に見てます。

事例が少ないだけに、今python3を採用すれば先進的なブランドイメージが得られるかもしれないですね。

以上。運営の皆様、お疲れ様でした!

haskellの勉強として、twitter apiライブラリを書いてみた

すごいH本をひと通り読んだので次は自分でコードを書いてみようと思い、Twitter APIのライブラリを適当な感じで書いてみました。

Attsun1031/birdwatcher · GitHub

使い方

import Web.Birdwatcher

main = do
  user <- users_show [("screen_name", "__Attsun__")]
  putStrLn $ show user

感想

  • モナドむずい
  • Aesonが便利(型クラスの仕組みのおかげ?)
  • ライブラリコードを読んだりcabalを設定したりするのに時間を多く使ってしまったので、使った時間の割にはコード量が少なかった。

レコードのアクセサ関数で困った

レコードとは以下の様な構造体のようなもの。

data Point = Pt {x :: Integer, y :: Integer}

レコードを定義すると、フィールドへのアクセサ関数x, yが自動的に定義されるわけだけど、アクセサ関数の名前がだぶってしまうので同じ名前のフィールド名を持つレコードを同じ名前空間に複数おけない。

例えば自分の作ったライブラリの場合、UsersShowとUserTimelineのどちらにもidという名前があるため同じ名前空間におけない。とりあえずmoduleを分けるという手で対処しているが、分けたくないってケースもあるだろうし、どうやってやりくりするんだろ?