大家好,所有的弹性研究大师. 我有数百万个数据要由elasticsearch Java API索引. elasticsearch的集群节点数为3(1作为主节点2). 我的代码片段如下. Settings settings = ImmutableSettings.settingsBuilder() .pu
我有数百万个数据要由elasticsearch Java API索引.
elasticsearch的集群节点数为3(1作为主节点2).
我的代码片段如下.
Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", "MyClusterName").build(); TransportClient client = new TransportClient(settings); String hostname = "myhost ip"; int port = 9300; client.addTransportAddress(new InetSocketTransportAddress(hostname, port)); BulkRequestBuilder bulkBuilder = client.prepareBulk(); BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path")))); long bulkBuilderLength = 0; String readLine = ""; String index = "my_index_name"; String type = "my_type_name"; String id = ""; while((readLine = br.readLine()) != null){ id = somefunction(readLine); String json = new ObjectMapper().writeValueAsString(readLine); bulkBuilder.add(client.prepareIndex(index, type, id) .setSource(json)); bulkBuilderLength++; if(bulkBuilderLength % 1000== 0){ logger.info("##### " + bulkBuilderLength + " data indexed."); BulkResponse bulkRes = bulkBuilder.execute().actionGet(); if(bulkRes.hasFailures()){ logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); } } } br.close(); if(bulkBuilder.numberOfActions() > 0){ logger.info("##### " + bulkBuilderLength + " data indexed."); BulkResponse bulkRes = bulkBuilder.execute().actionGet(); if(bulkRes.hasFailures()){ logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); } bulkBuilder = client.prepareBulk(); }
它工作正常,但在成千上万的文档之后,性能迅速下降.
我已经尝试将“refresh_interval”的设置值更改为-1,将“number_of_replicas”更改为0.
但是,性能下降的情况是一样的.
如果我使用bigdesk监控集群的状态,则GC值每秒都会达到1,如下面的屏幕截图所示.
有人可以帮帮我吗?
提前致谢.
===================更新===========================
最后,我已经解决了这个问题. (见答案).
问题的原因是我错过了重新创建一个新的BulkRequestBuilder.
在我更改了下面的代码片段后,性能降低从未发生过.
非常感谢你.
Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", "MyClusterName").build(); TransportClient client = new TransportClient(settings); String hostname = "myhost ip"; int port = 9300; client.addTransportAddress(new InetSocketTransportAddress(hostname, port)); BulkRequestBuilder bulkBuilder = client.prepareBulk(); BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path")))); long bulkBuilderLength = 0; String readLine = ""; String index = "my_index_name"; String type = "my_type_name"; String id = ""; while((readLine = br.readLine()) != null){ id = somefunction(readLine); String json = new ObjectMapper().writeValueAsString(readLine); bulkBuilder.add(client.prepareIndex(index, type, id) .setSource(json)); bulkBuilderLength++; if(bulkBuilderLength % 1000== 0){ logger.info("##### " + bulkBuilderLength + " data indexed."); BulkResponse bulkRes = bulkBuilder.execute().actionGet(); if(bulkRes.hasFailures()){ logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); } bulkBuilder = client.prepareBulk(); // This line is my mistake and the solution !!! } } br.close(); if(bulkBuilder.numberOfActions() > 0){ logger.info("##### " + bulkBuilderLength + " data indexed."); BulkResponse bulkRes = bulkBuilder.execute().actionGet(); if(bulkRes.hasFailures()){ logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); } bulkBuilder = client.prepareBulk(); }这里的问题是您在批量执行后不再重新创建一个新的Bulk.
这意味着您要一次又一次地重新索引相同的第一个数据.
顺便说一句,看看BulkProcessor类.绝对更好用.