ElasticSearchでビッグデータ集計を試みて困った点その1:Java APIで何回も接続に行っていたらNoNodeAvailableExceptionで落ちる
前にも書いたのですが、ElasticSearchはまだ日本語の情報が少なく、はまった時にやや面倒というのが私の所感です。ElasticSearchを使って1000万件ぐらいのデータの集計をしようと試みていくつか困った点があったので、日本語の情報が無さそうだった物だけ備忘録として残しておきます。言語はJavaです。Pythonは苦手です。
まず、1000万件のデータをいっぺんに引っ張ってくると当然メモリが足りなくて落ちるので、以下のようなコードで10万件ずつ取ってくるようにしました。
//まずAggregationクエリで全体の件数確認 StatsBuilder aggregation = AggregationBuilders.stats("agg").field("id"); SearchRequestBuilder searchReq = client.prepareSearch("index_name"); searchReq.setTypes("test"); searchReq.setSize(0); searchReq.addAggregation(aggregation); SearchResponse searchRes = searchReq.execute().actionGet(); Stats stats = searchRes.getAggregations().get("agg"); long docCount = stats.getCount(); int offset = 0; int rows = 100000; while(true) { SearchRequestBuilder searchReq = client.prepareSearch("index_name"); searchReq.setTypes("test"); searchReq.setSize(rows); searchReq.setFrom(offset); searchReq.setQuery(QueryBuilders.matchAllQuery()); SearchResponse searchRes = searchReq.execute().actionGet(); for(Iterator<SearchHit> itr = searchRes.getHits().iterator(); itr.hasNext();) { //集計処理 offset++; } if (offset > docCount) { break; } }
すると、集計の途中(だいたい800万件ぐらい)でNoNodeAvailableException:No node availableで落ちる事象が頻発しました。
※ElasticSearchにポートフォワーディングで聞きに行っていたのが主原因かもしれませんが、今回はそこは変更できなかったので、他の対処策を考えました。
結局、searchReq.execute().actionGet();を呼ぶ時にNoNodeAvailableExceptionを拾ったら一定時間スリープするという単純な方法で一応解決しました。もっとスマートなやり方ありましたら教えてください。
SearchResponse searchRes; while (true) { try { searchRes = searchReq.execute().actionGet(); break; } catch (NoNodeAvailableException cont) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } }
ElasticSearchのJava APIの日本語記事がほとんど無い気がして困っています。皆さん何でやられてるんでしょうか?