Apache Sparkについてのメモ
大規模バッチ処理が必要になったので、名前だけ聞いた事はあっても詳細を知らなかったApache Sparkについて簡単に調べました。
参考サイト
http://spark.apache.org/talks/overview.pdf
Apache Sparkってどんなものか見てみる(その1 - 夢とガラクタの集積場
Resilient Distributed Datasetsに関する論文まとめ(1章〜5章 - 夢とガラクタの集積場
Apache Spark
- インメモリな分散データ処理プラットフォーム
- データをメモリに保持するため、ストレージから入力を受けストレージに出力するMapReduceと比較して、機械学習やグラフ描画などの同じデータを繰り返し用いるジョブに対して強い
- Resilient Distributed Datasets(RDDs)を採用する事で、Hadoopの耐障害性、データ局所性、スケーラビリティを保持しつつメモリ上へのデータ保持を行っている。
Resilient Distributed Datasets(RDDs)
概要
- 耐障害性を確保した上でメモリ上での計算を可能にするための読み取り専用分散データセット
- 効率的な耐障害性の確保のために、「荒い変換のみ可能」な制限されたデータセットとして提供されている。(例:map, filter, join)
- RDDsに対して荒い変換を行うと、RDDs内に存在する全てのデータに対して同じ演算が発生する (一斉書き込みのみをサポート)
- 実データそのものを記録するのではなく、「データに対して行った変換」を記録する事で、効率的な耐障害性を実現している
プログラミングインタフェース概要
- RDDsの概念を実装したインタフェースはScalaで提供される
- RDDsはまず静的ストレージに対するtransformation(map,filterなど)で生成される
- 生成されたRDDsに対して値を返すアクションやデータ出力のアクションを実行可能
- 生成したRDDsをその後の処理で再利用したい場合は、persistメソッドを呼び出す事で対象のRDDsをメモリ上に保持できる
RDDsの主な利点(vs分散共有メモリシステム)
- 一括書き込みのみを認めているため、より効果的な耐障害性が提供される
- データが読み取り専用であるため、遅いタスクのコピーを実行する事で実行時のパフォーマンス向上が可能 (分散共有メモリシステムでは、2箇所から更新が来た場合競合が発生するためタスクのコピーが不可能)
- メモリが不十分になった際に、スキャンをメモリ上で優先実行しそれ以外をディスクに移行する事でパフォーマンス低下を極力防げる
RDDsが適するケース
- データ全体に対して一括操作を実行する場合に適している
- 状態を管理しながら短いスパンで同期して処理を進めていく必要があるケースには適していない
RDDsの表現方法
上述した通り、RDDsは必ずしも実データそのものを保持しておらず、「データに対して行った変換」を保持する抽象的なモデルである。
RDDsではグラフベースのRDD表現によって様々なtransformationを追跡可能にしている。RDDに付与される5つの情報を以下に示す。
ここで重要なのがRDD間の依存関係をどのように表現するかということだが、依存を以下の2つに分類することでシンプルに依存が表現可能になっている。
この依存定義は以下の2つの理由で便利である。
- 狭い依存ではクラスタノードを全て親のパーティションで実行可能であり、すなわちパイプラインを実行可能という事を意味する。(広い依存では全子ノードの親RDDが使用可能である必要が有る)
- ノード障害時に、狭い依存の場合はロストしたパーティションのみを再計算すれば良い。広い依存の場合はノード間を含めた完全再計算が求められるケースがある。
Spark Streaming
- Sparkはあくまでバッチ処理用のフレームワーク
- しかしデータを超細切れなバッチの系列として処理する事で、ストリームコンピューティングを実現する事ができる。これがSpark Streamingである。
- これによって、従来はバッチ処理とストリーム処理を別々に実装し運用しなければならなかったのが、同じ基盤上で走らせる事ができるようになる。(らしい)
- だが、本質的にはやはりストリーム処理とバッチ処理は異なり、StormとSpark Streamingの違いもそこにあるらしい。その辺は今度まとめる
以上。
協調フィルタリングのメモ
レコメンドエンジンを作りたくなったので、協調フィルタリングについて簡単に調べました。今回はメモリベースの協調フィルタリングにのみ言及します。数学的な式などは省いており、概念的なメモだけです。
参考リンク:
http://japan.zdnet.com/web/sp_08ec/20375385/
http://d.hatena.ne.jp/EulerDijkstra/20130407/1365349866
http://www.kamishima.net/archive/recsysdoc.pdf
協調フィルタリングとは
ユーザーの行動履歴からレコメンド内容を決める手法。
ユーザーの行動履歴のみからレコメンドを行えるのでアイテムの詳細情報を事前インプットしておく必要がない。そのため人が想定しないレコメンドができ、また半自動化ができる。
ユーザーベース協調フィルタリングとアイテムベース協調フィルタリングに分類される。
ユーザーベース協調フィルタリング
ユーザーの行動履歴を基に類似ユーザーを判定し、類似ユーザーがチェックしたアイテムをレコメンドする。
アイテムベース協調フィルタリング
ユーザーの行動履歴を基に類似アイテムを判定し、レコメンドする。
ユーザーベース協調フィルタリングには以下の欠点があるため、アイテムベース協調フィルタリングの方が用いられるケースは多い。(たぶん)
ユーザーベース協調フィルタリングの主な欠点
- ユーザーが1人増えるごとにユーザー数ぶんの類似度計算を行う必要があり、処理が重くなる (アイテムベースの場合は事前にアイテム同士の類似度計算を行っておく事で処理を軽くできる)
- 対象となるユーザーの履歴が溜まっていない場合にレコメンドができない
- アイテムベースと比較して外れ値に影響されやすい。
類似度計算の代表的手法
- ユークリッド距離
- ピアソン相関係数(相関を取るのでデータが正規化されていない場合に有効)
- コサイン類似性(アイテムベース類似性で良く使われる)
アイテムベースにしてもユーザーベースにしてもデータ量は非常に大きくなるので、類似度計算は最大のボトルネックになる。
レコメンドの精度を上げるために
以下の工夫が代表的。どんなサービスかによって細かいチューニングの方法は変わる。
ElasticSearchの良く使うcurlコマンドまとめ
ElasticSearchの良く使うcurlコマンドをまとめました。良かったら使ってください。
コマンドの詳細な説明は書いていませんが、キーワードで調べればすぐ分かるかと思います。
データ投入系
スキーマの登録
curl -XPOST http://localhost:9200/testindex -d @mapper.json
スキーマの追加
curl -XPUT 'http://localhost:9200/testindex/_mapping/test' -d ' { "properties" : { "huga" : { "type" : "long" }, "hoge" : { "type" : "string" } } }'
スキーマの確認
http://localhost:9200/testindex/_mapping?pretty
データの登録
curl -XPOST http://localhost:9200/_bulk --data-binary @data.json >/dev/null
データ削除系
インデックスごと削除
curl -XDELETE 'http://localhost:9200/testindex'
データだけ削除(DELETE *的な)
curl -XDELETE 'http://localhost:9200/testindex/test/_query' -d '{ "query" : { "match_all" : { } } }'
特定条件データだけ削除
curl -XDELETE 'http://localhost:9200/testindex/test/_query' -d '{ "query" : { "term" : {"hoge" : 1} } }'
インデックスの設定変更系
エイリアスを設定
curl -XPOST 'http://localhost:9200/_aliases' -d ' { "actions" : [ {"add": {"index": "test_index", "alias": "test_alias" }} ] }'
レプリカシャードの数を変更
curl -XPUT http://localhost:9200/testindex/_settings -d ' { "index" : { "number_of_replicas" : 0 } }'
ソフトコミットの頻度を変更
curl -XPUT http://localhost:9200/testindex/_settings -d ' { "index" : { "refresh_interval" : "30s" } }'
Warmerを登録
curl -XPUT localhost:9200/testindex/_warmer/warmer_1 -d '{ "query" : { "match_all" : {} }, "aggs" : { "aggs_1" : { "terms" : { "field" : "field" } } } }'
クラスターの設定変更系
シャードの自動移動設定の変更
curl -XPUT http://localhost:9200/_cluster/settings -d '{ "persistent": {"cluster.routing.allocation.enable": "none"} }' curl -XPUT http://localhost:9200/_cluster/settings -d '{ "persistent": {"cluster.routing.allocation.enable": "all"} }'
マスターノードの決定に必要なノード数を変更
curl -XPUT http://localhost:9200/_cluster/settings -d '{ "persistent": {"discovery.zen.minimum_master_nodes": 4} }'
シャードを手動で移動
curl -XPOST 'localhost:9200/_cluster/reroute' -d '{ "commands" : [ { "move" : { "index" : "test", "shard" : 0, "from_node" : "node1", "to_node" : "node2" } } ] }'
Unassignedになっているシャードを再配置
curl -XPOST 'localhost:9200/_cluster/reroute' -d '{ "commands" : [ { { "allocate" : { "index" : "test", "shard" : 1, "node" : "node3" } } ] }'
以上です。何か間違いありましたら教えてください。