サナギわさわさ.json

サナギさんとキルミーベイベーとプログラミングが好きです

Apache Sparkについてのメモ

大規模バッチ処理が必要になったので、名前だけ聞いた事はあっても詳細を知らなかったApache Sparkについて簡単に調べました。

参考サイト

http://spark.apache.org/talks/overview.pdf

Apache Sparkってどんなものか見てみる(その1 - 夢とガラクタの集積場

Resilient Distributed Datasetsに関する論文まとめ(1章〜5章 - 夢とガラクタの集積場

spark_rdd.md · GitHub

Apache Spark

  • インメモリな分散データ処理プラットフォーム
  • データをメモリに保持するため、ストレージから入力を受けストレージに出力するMapReduceと比較して、機械学習やグラフ描画などの同じデータを繰り返し用いるジョブに対して強い
    • 初回ロード時は重くなるが、それ以降はメモリ上から読み込むためHadoopより速度が上がる。機械学習などのイテレーション複数回走る処理ではかなり差が出る。
    • 必要なメモリ量はHadoopよりも多くなる
  • Resilient Distributed Datasets(RDDs)を採用する事で、Hadoopの耐障害性、データ局所性、スケーラビリティを保持しつつメモリ上へのデータ保持を行っている。

Resilient Distributed Datasets(RDDs)

概要
  • 耐障害性を確保した上でメモリ上での計算を可能にするための読み取り専用分散データセット
  • 効率的な耐障害性の確保のために、「荒い変換のみ可能」な制限されたデータセットとして提供されている。(例:map, filter, join)
  • RDDsに対して荒い変換を行うと、RDDs内に存在する全てのデータに対して同じ演算が発生する (一斉書き込みのみをサポート)
  • 実データそのものを記録するのではなく、「データに対して行った変換」を記録する事で、効率的な耐障害性を実現している

プログラミングインタフェース概要

  • RDDsの概念を実装したインタフェースはScalaで提供される
  • RDDsはまず静的ストレージに対するtransformation(map,filterなど)で生成される
  • 生成されたRDDsに対して値を返すアクションやデータ出力のアクションを実行可能
  • 生成したRDDsをその後の処理で再利用したい場合は、persistメソッドを呼び出す事で対象のRDDsをメモリ上に保持できる
    • メモリ上に収まらない場合はディスクも併用
    • persistメソッドを呼ぶ際に、ディスクのみに保存するか、マシン間レプリケーションを行うかの配置選択も可能

RDDsの主な利点(vs分散共有メモリシステム)

  • 一括書き込みのみを認めているため、より効果的な耐障害性が提供される
    • データロストの場合でも、データ変換記録をたどって復旧が可能なためチェックポイントのオーバーヘッドが発生しない
    • パーティションがロストした場合でもプログラム全体のロールバックが必要なく、異なるノードでの並列計算で復旧できる
  • データが読み取り専用であるため、遅いタスクのコピーを実行する事で実行時のパフォーマンス向上が可能 (分散共有メモリシステムでは、2箇所から更新が来た場合競合が発生するためタスクのコピーが不可能)
  • メモリが不十分になった際に、スキャンをメモリ上で優先実行しそれ以外をディスクに移行する事でパフォーマンス低下を極力防げる

RDDsが適するケース

  • データ全体に対して一括操作を実行する場合に適している
  • 状態を管理しながら短いスパンで同期して処理を進めていく必要があるケースには適していない

RDDsの表現方法

上述した通り、RDDsは必ずしも実データそのものを保持しておらず、「データに対して行った変換」を保持する抽象的なモデルである。
RDDsではグラフベースのRDD表現によって様々なtransformationを追跡可能にしている。RDDに付与される5つの情報を以下に示す。

ここで重要なのがRDD間の依存関係をどのように表現するかということだが、依存を以下の2つに分類することでシンプルに依存が表現可能になっている。

  • 狭い依存:親RDDが子RDDに最大1つまでしか参照されない
  • 広い依存:親RDD複数の子RDDに参照される

この依存定義は以下の2つの理由で便利である。

  • 狭い依存ではクラスタノードを全て親のパーティションで実行可能であり、すなわちパイプラインを実行可能という事を意味する。(広い依存では全子ノードの親RDDが使用可能である必要が有る)
  • ノード障害時に、狭い依存の場合はロストしたパーティションのみを再計算すれば良い。広い依存の場合はノード間を含めた完全再計算が求められるケースがある。

Spark Streaming

  • Sparkはあくまでバッチ処理フレームワーク
  • しかしデータを超細切れなバッチの系列として処理する事で、ストリームコンピューティングを実現する事ができる。これがSpark Streamingである。
  • これによって、従来はバッチ処理とストリーム処理を別々に実装し運用しなければならなかったのが、同じ基盤上で走らせる事ができるようになる。(らしい)
  • だが、本質的にはやはりストリーム処理とバッチ処理は異なり、StormとSpark Streamingの違いもそこにあるらしい。その辺は今度まとめる

以上。

協調フィルタリングのメモ

レコメンドエンジンを作りたくなったので、協調フィルタリングについて簡単に調べました。今回はメモリベースの協調フィルタリングにのみ言及します。数学的な式などは省いており、概念的なメモだけです。
参考リンク: http://japan.zdnet.com/web/sp_08ec/20375385/
http://d.hatena.ne.jp/EulerDijkstra/20130407/1365349866
http://www.kamishima.net/archive/recsysdoc.pdf

協調フィルタリングとは

ユーザーの行動履歴からレコメンド内容を決める手法。
ユーザーの行動履歴のみからレコメンドを行えるのでアイテムの詳細情報を事前インプットしておく必要がない。そのため人が想定しないレコメンドができ、また半自動化ができる。
ユーザーベース協調フィルタリングとアイテムベース協調フィルタリングに分類される。

ユーザーベース協調フィルタリング

ユーザーの行動履歴を基に類似ユーザーを判定し、類似ユーザーがチェックしたアイテムをレコメンドする。

アイテムベース協調フィルタリング

ユーザーの行動履歴を基に類似アイテムを判定し、レコメンドする。

ユーザーベース協調フィルタリングには以下の欠点があるため、アイテムベース協調フィルタリングの方が用いられるケースは多い。(たぶん)

ユーザーベース協調フィルタリングの主な欠点

  • ユーザーが1人増えるごとにユーザー数ぶんの類似度計算を行う必要があり、処理が重くなる (アイテムベースの場合は事前にアイテム同士の類似度計算を行っておく事で処理を軽くできる)
  • 対象となるユーザーの履歴が溜まっていない場合にレコメンドができない
  • アイテムベースと比較して外れ値に影響されやすい。

類似度計算の代表的手法

  • ユークリッド距離
  • ピアソン相関係数(相関を取るのでデータが正規化されていない場合に有効)
  • コサイン類似性(アイテムベース類似性で良く使われる)

アイテムベースにしてもユーザーベースにしてもデータ量は非常に大きくなるので、類似度計算は最大のボトルネックになる。

レコメンドの精度を上げるために

以下の工夫が代表的。どんなサービスかによって細かいチューニングの方法は変わる。

  • 履歴を複数種類抽出し、それぞれに重み付けを行う(閲覧・興味・購入など)
  • 異なる履歴から複数種類のレコメンドを算出し、場面によって使い分ける(例:閲覧ベースレコメンドと興味ベースレコメンド)
  • アイテムの類似度をそのままレコメンド率に用いるのではなく、類似度上位10個の中からそれぞれ10%ずつ出すなどの平滑化処理を行う
  • 外れ値の除外(同一ユーザーからの大量アクセス・レコメンド機能を介したアクセスなどは除外するのが一般的)
  • メモリベースとコンテンツベースのハイブリッド (評価が十分に集まっていないアイテムに対してもレコメンドを出したい場合などに特に有効)

ElasticSearchの良く使うcurlコマンドまとめ

ElasticSearchの良く使うcurlコマンドをまとめました。良かったら使ってください。
コマンドの詳細な説明は書いていませんが、キーワードで調べればすぐ分かるかと思います。

データ投入系

スキーマの登録

curl -XPOST http://localhost:9200/testindex -d @mapper.json

スキーマの追加

curl -XPUT 'http://localhost:9200/testindex/_mapping/test' -d '
{
    "properties" : {
      "huga" : {
        "type" : "long"
      },
      "hoge" : {
        "type" : "string"
      }
    }
}'

スキーマの確認

http://localhost:9200/testindex/_mapping?pretty

データの登録

curl -XPOST http://localhost:9200/_bulk --data-binary @data.json >/dev/null

データ削除系

インデックスごと削除

curl -XDELETE 'http://localhost:9200/testindex'

データだけ削除(DELETE *的な)

curl -XDELETE 'http://localhost:9200/testindex/test/_query' -d '{
    "query" : {
        "match_all" : { }
    }
}'

特定条件データだけ削除

curl -XDELETE 'http://localhost:9200/testindex/test/_query' -d '{
    "query" : {
        "term" : {"hoge" : 1}
    }
}'

インデックスの設定変更系

エイリアスを設定

curl -XPOST 'http://localhost:9200/_aliases' -d '
{
  "actions" : [
    {"add": {"index": "test_index", "alias": "test_alias" }}
  ]
}'

レプリカシャードの数を変更

curl -XPUT http://localhost:9200/testindex/_settings -d '
{
    "index" : {
        "number_of_replicas" : 0
    }
}'

ソフトコミットの頻度を変更

curl -XPUT http://localhost:9200/testindex/_settings -d '
{
    "index" : {
        "refresh_interval" :  "30s"
    }
}'

Warmerを登録

curl -XPUT localhost:9200/testindex/_warmer/warmer_1 -d '{
    "query" : {
        "match_all" : {}
    },
    "aggs" : {
        "aggs_1" : {
            "terms" : {
                "field" : "field"
            }
        }
    }
}'

クラスターの設定変更系

シャードの自動移動設定の変更

curl -XPUT http://localhost:9200/_cluster/settings -d '{
 "persistent": {"cluster.routing.allocation.enable": "none"}
}'
curl -XPUT http://localhost:9200/_cluster/settings -d '{
 "persistent": {"cluster.routing.allocation.enable": "all"}
}'

マスターノードの決定に必要なノード数を変更

curl -XPUT http://localhost:9200/_cluster/settings -d '{
 "persistent": {"discovery.zen.minimum_master_nodes": 4}
}'

シャードを手動で移動

curl -XPOST 'localhost:9200/_cluster/reroute' -d '{
    "commands" : [ {
        "move" :
            {
              "index" : "test", "shard" : 0,
              "from_node" : "node1", "to_node" : "node2"
            }
        }
    ]
}'

Unassignedになっているシャードを再配置

curl -XPOST 'localhost:9200/_cluster/reroute' -d '{
    "commands" : [ {
        {
          "allocate" : {
              "index" : "test", "shard" : 1, "node" : "node3"
          }
        }
    ]
}'

以上です。何か間違いありましたら教えてください。