elasticSearch-5.5.2-后台javaAPI(有问题+842161530QQ联系讨论) package com.es.backStage;import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;import java.io.IOException;import java.net.InetAddress;import java.net
package com.es.backStage; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutionException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.transport.client.PreBuiltTransportClient; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.trs.hybase.client.TRSException; public class ESClient { private TransportClient client; /** * init方法可以写在构造器完成初次初始化。 */ public ESClient() { // TODO Auto-generated constructor stub } /** * init 初始化方法需要最先调用,在这里设置了setting,ip,esport信息.例子见main * * @param IP * @param port * @param clusterName * @throws UnknownHostException */ @SuppressWarnings({ "resource", "unchecked" }) public void init(String IP, int port, String clusterName) throws UnknownHostException { // 设置集群名称 // 启动嗅探功能 Settings settings = Settings.builder().put("cluster.name", clusterName).put("client.transport.sniff", true) .build(); // 创建client client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(IP), port)); } /** * 判断集群中{index}是否存在 * * @param index * @return 存在(true)、不存在(false) */ public boolean indexExists(String index) { IndicesExistsRequest request = new IndicesExistsRequest(index); IndicesExistsResponse response = client.admin().indices().exists(request).actionGet(); if (response.isExists()) { return true; } return false; } /** * 直接创建索引 * * @param index * @return * @throws TRSException * @throws IOException */ public boolean createIndex(String index) throws TRSException, IOException { try { IndicesExistsRequest request = new IndicesExistsRequest(index); IndicesExistsResponse response = client.admin().indices().exists(request).actionGet(); if (response.isExists()) { return false; } else { client.admin().indices().prepareCreate(index).get(); return true; } } catch (ElasticsearchException e) { e.printStackTrace(); return false; } } /** * 创建索引 * * @param index * @param shardCount(分片数) * @param repliceCount(副本数) * @return * @throws TRSException * @throws IOException */ public boolean createIndex(String index, int shardCount, int repliceCount) throws TRSException, IOException { try { IndicesExistsRequest request = new IndicesExistsRequest(index); IndicesExistsResponse response = client.admin().indices().exists(request).actionGet(); if (response.isExists()) { return false; } else { String source = "{\"type\":{\"properties\":{\"content\":{\"type\":\"string\"},\"id\":{\"type\":\"long\"},\"posttime\":{\"type\":\"date\",\"format\":\"dateOptionalTime\"},\"title\":{\"type\":\"string\"},\"example\":{\"type\":\"string\"}}}} "; client.admin().indices().prepareCreate(index).setSettings(Settings.builder() .put("index.number_of_shards", shardCount).put("index.number_of_replicas", repliceCount)) .addMapping("type", source).get(); return true; } } catch (ElasticsearchException e) { e.printStackTrace(); return false; } } /** * 刷新所有index */ public boolean reFresh() { try { client.admin().indices().prepareRefresh().get(); return true; } catch (ElasticsearchException e) { e.printStackTrace(); return false; } } /** * 刷新指定index * * @param index */ public boolean reFresh(String index) { try { client.admin().indices().prepareRefresh(index).get(); return true; } catch (ElasticsearchException e) { e.printStackTrace(); return false; } } /** * 创建index的时候同时制定分片,副本和mapping,用逗号分隔属性和分词器 * * @param index * @param shardCount * @param repliceCount * @param kv * @return * @throws TRSException * @throws IOException */ public boolean createIndex(String index, int shardCount, int repliceCount, Mapkv) throws TRSException, IOException { try { String postdata = ""; // 获取键和值 for (Entry entry : kv.entrySet()) { postdata = postdata + "\"" + entry.getKey() + "\":{\"type\":\"" + entry.getValue().split(",")[0] + "\",\"analyzer\":\"" + entry.getValue().split(",")[1] + "\"},"; } postdata = postdata.substring(0, postdata.length() - 1);// 去点最后一个逗号 // 检查是否index已存在 IndicesExistsRequest request = new IndicesExistsRequest(index); IndicesExistsResponse response = client.admin().indices().exists(request).actionGet(); if (response.isExists()) { return false; } else { String source = "{\"type\":{\"properties\":{" + postdata + "}}} "; client.admin().indices().prepareCreate(index).setSettings(Settings.builder() .put("index.number_of_shards", shardCount).put("index.number_of_replicas", repliceCount)) .addMapping("type", source).get(); XContentBuilder mapping = null; return true; } } catch (ElasticsearchException e) { e.printStackTrace(); return false; } } /** * 更新已存在的index的mapping * * @param index * @param type * @param kv * @return */ public boolean updateMapping(String index, String type, Map kv) { try { String postdata = ""; // 获取键和值 for (Entry entry : kv.entrySet()) { //postdata = postdata + "\"" + entry.getKey() + "\":{\"type\":\"" + entry.getValue() + "\"},"; postdata = postdata + "\"" + entry.getKey() + "\":{\"type\":\"" + entry.getValue().split(",")[0] + "\",\"analyzer\":\"" + entry.getValue().split(",")[1] + "\"},"; } postdata = postdata.substring(0, postdata.length() - 1);// 去点最后一个逗号 // 检查是否index已存在 IndicesExistsRequest request = new IndicesExistsRequest(index); IndicesExistsResponse response = client.admin().indices().exists(request).actionGet(); if (response.isExists()) {// 存在才执行 String source = "{\"properties\":{" + postdata + "}} "; client.admin().indices().preparePutMapping(index).setType(type).setSource(source).get(); return true; } else { return false; } } catch (ElasticsearchException e) { e.printStackTrace(); return false; } } /** * 获取mapping * * @param index * @param type * @return * @throws IOException */ public Map getMapping(String index, String type) throws IOException { // 检查是否index已存在 IndicesExistsRequest request = new IndicesExistsRequest(index); IndicesExistsResponse response = client.admin().indices().exists(request).actionGet(); if (response.isExists()) {// 存在才执行 ClusterState cs = client.admin().cluster().prepareState().setIndices(index).execute().actionGet() .getState(); IndexMetaData imd = cs.getMetaData() // 这个名称同上面的名称 .index(index); // type的名称 MappingMetaData mdd = imd.mapping(type); return mdd.sourceAsMap(); } else { return null; } } /** * 重新设置index的副本数--- * * @param index * @param repliceCount * @throws TRSException */ public boolean setReplicaCountCount(String index, int repliceCount) throws TRSException { try { client.admin().indices().prepareUpdateSettings(index) .setSettings(Settings.builder().put("index.number_of_replicas", repliceCount)).get(); return true; } catch (ElasticsearchException e) { e.printStackTrace(); return false; } } /** * 重新设置index的分页数--- * * @param indexName * @param shardCount * @throws TRSException */ public boolean setShardCount(String index, int shardCount) throws TRSException { try { client.admin().indices().prepareUpdateSettings(index) .setSettings(Settings.builder().put("number_of_shards", shardCount)).get(); return true; } catch (ElasticsearchException e) { e.printStackTrace(); return false; } } /** * 求索引库文档总数 * * @param index * @return */ public int getRecordCount(String index) { String result = client.prepareSearch(index).setFrom(0).execute().actionGet().toString(); int recordCount = Integer.parseInt((result.split("total")[2].split(",")[0].substring(2))); return recordCount; } /** * 索引库分片总数 * * @param index * @return */ public int getAllShardCount(String index) { String result = client.prepareSearch(index).setFrom(0).execute().actionGet().toString(); int recordCount = Integer.parseInt((result.split(",")[2].substring(19))); return recordCount; } /** * 索引库成功启动分片总数 * * @param index * @return */ public int getSuccessfulShardCount(String index) { String result = client.prepareSearch(index).setFrom(0).execute().actionGet().toString(); int recordCount = Integer.parseInt((result.split(",")[3].substring(13))); return recordCount; } /** * 索引库副本总数 * * @param index * @return */ public int getAllReplace(String index) { Settings settings = null; Integer shards = 0; Integer replicas = 0; GetSettingsResponse response = client.admin().indices().prepareGetSettings(index).get(); for (ObjectObjectCursor cursor : response.getIndexToSettings()) { // indexName = cursor.key; settings = cursor.value; shards = settings.getAsInt("index.number_of_shards", null); replicas = settings.getAsInt("index.number_of_replicas", null); // System.out.println(indexName); // System.out.println(shards); // System.out.println(replicas); } return replicas; } /** * 查看索引setting,这个有bug * * @param index * @return */ public Settings getSetting(String index) { String indexName = null; Settings settings = null; Integer shards = 0; Integer replicas = 0; GetSettingsResponse response = client.admin().indices().prepareGetSettings(index).get(); for (ObjectObjectCursor cursor : response.getIndexToSettings()) { indexName = cursor.key; settings = cursor.value; shards = settings.getAsInt("index.number_of_shards", null); replicas = settings.getAsInt("index.number_of_replicas", null); // System.out.println(indexName); // System.out.println(shards); // System.out.println(replicas); } return settings; } /** * 添加数据到指定index,type,id,成功返回true * * @param index * @param type * @param id * @param map * @return * @throws IOException */ public boolean creatDoc(String index, String type, String id, Map map) throws IOException { BulkRequestBuilder bulkRequest = client.prepareBulk(); XContentBuilder contentbuilder = null; // 开始解析json contentbuilder = jsonBuilder().startObject(); // 填充内容 for (String key : map.keySet()) { contentbuilder.field(key, map.get(key)); } // 结束解析json contentbuilder.endObject(); // add数据进bulk,并且指定index,type,id bulkRequest.add(client.prepareIndex(index, type, id).setSource(contentbuilder)); // 提交bulk BulkResponse bulkResponse = bulkRequest.get(); // 提示失败信息 if (bulkResponse.hasFailures()) { return false; // System.out.println(bulkResponse.buildFailureMessage()); } else { return true; } } /** * 删除数据 根据id.成功返回true * * @param index * @param type * @param id * @return * @throws IOException */ public boolean delRecord(String index, String type, String id) throws IOException { IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(index); IndicesExistsResponse inExistsResponse = client.admin().indices().exists(inExistsRequest).actionGet(); if (inExistsResponse.isExists() == true) { client.prepareDelete(index, type, id).execute().actionGet(); return true; } else { // System.out.println("data do not exits"); return false; } } /** * 删除数据 根据库名称. * * @param index * @return * @throws IOException */ public boolean delIndex(String index) throws IOException { IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(index); IndicesExistsResponse inExistsResponse = client.admin().indices().exists(inExistsRequest).actionGet(); if (inExistsResponse.isExists() == true) { client.admin().indices().prepareDelete(index).execute().actionGet(); return true; } else { // System.out.println("index do not exits"); return false; } } /** * 获取所有index值,输出为字符串数组 * * @return */ public String[] getAllIndex() { GetIndexResponse response = client.admin().indices().prepareGetIndex().execute().actionGet(); return response.getIndices(); } /** * 获取index的数量 * * @return */ public int getIndexCount() { GetIndexResponse response = client.admin().indices().prepareGetIndex().execute().actionGet(); return response.getIndices().length; } /** * 根据index获取该index下所有typeName,输出为字符串数组 * * @param index * @return */ public String[] getTypeByIndex(String index) { // 获取type GetMappingsResponse res = null; try { res = client.admin().indices().getMappings(new GetMappingsRequest().indices(index)).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } ImmutableOpenMap mapping = res.mappings().get(index); // 将type按照string[]输出 int length = mapping.size(); String[] result = new String[length]; // 展示 int temp = 0; for (ObjectObjectCursor c : mapping) { result[temp] = c.key.toString(); // System.out.println(result[temp]); // System.out.println("type = " + c.key); // System.out.println(c.value.source()); temp++; } return result; } /** * 根据index,获取当前所有字段名称。 * * @param index * @return */ public Set getAllFields(String index) { // 通过client设置查询的index、type、query.返回一个SearchResponse对象: SearchResponse response = client.prepareSearch(index).execute().actionGet(); SearchHits hits = response.getHits(); Set result = null; for (SearchHit hit : hits) {// 由于只想获取字段即可,所以取第一排的字段,之后break result = (hit.getSource().keySet());// 获取所有字段 break; } return result; } /** * 根据index,type,获取当前所有字段名称。注意,由于在海贝中没有type或者是表的字库概念,type统一设置就好,推荐用上个方法 * * @param index * @param type * @return */ public Set getAllFields(String index, String type) { // 通过client设置查询的index、type、query.返回一个SearchResponse对象: SearchResponse response = client.prepareSearch(index).setTypes(type).execute().actionGet(); SearchHits hits = response.getHits(); Set result = null; for (SearchHit hit : hits) {// 由于只想获取字段即可,所以取第一排的字段,之后break result = (hit.getSource().keySet());// 获取所有字段 // System.out.println(hits.getHits().length); // "score:" + hit.getScore() + ":\t" + break; } return result; } /** * 根据index,type,id,获取一条记录对象。 * * @param index * @param type * @param id * @return */ public GetResponse getRecord(String index, String type, String id) { GetResponse getResponse = client.prepareGet(index, type, id).get(); return getResponse; } /** * 所有的getrecords全写在这里 * * @param index * @param type * @param searchType * @param input * @param start * @param size * @return */ public SearchHits getRecords(String index, String type, int searchType, String input, int start, int size) { // 对于index的type进行查询,其中client即使得到的建立链接,下一步就是要将查询词给进去 SearchRequestBuilder responsebuilder = client.prepareSearch(index).setTypes(type); SearchResponse myresponse = null; String field = null; String text = null; switch (searchType) { case 1: /** * field:text */ // 提取字段 field = input.split(":")[0]; text = input.substring(field.length() + 1); // 设置搜索条件 myresponse = responsebuilder .setQuery(QueryBuilders.matchPhraseQuery(field, text))// 单字段查询 1 .setFrom(start).setSize(size).execute().actionGet(); break; case 2: /** * field:text1,text2,text3 */ // 提取字段 field = input.split(":")[0]; String[] textArray_2 = input.substring(field.length() + 1).split(","); // 设置搜索条件 myresponse = responsebuilder .setQuery(QueryBuilders.termsQuery(field, textArray_2)) // 多词条查询 2 .setFrom(start).setSize(size).execute().actionGet(); break; case 3: /** * 直接输搜索内容 */ myresponse = responsebuilder .setQuery(QueryBuilders.matchAllQuery()) // 查询全部 3 .setFrom(start).setSize(size).execute().actionGet(); break; case 4: /** * text:field1,field2,field3 */ // 提取字段 text = input.split(":")[0]; String[] fieldArray_4 = input.substring(field.length() + 1).split(","); // 设置搜索条件 myresponse = responsebuilder .setQuery(QueryBuilders.multiMatchQuery(text, fieldArray_4))// 多字段查询 4 .setFrom(start).setSize(size).execute().actionGet(); break; case 5: /** * text:field */ // 提取字段 field = input.split(":")[0]; text = input.substring(field.length() + 1); // 设置搜索条件 myresponse = responsebuilder .setQuery(QueryBuilders.wildcardQuery(field, text)) // 通配符查询 5 .setFrom(start).setSize(size).execute().actionGet(); break; case 6: /** * text:min TO max */ // 提取字段 field = input.split(":")[0]; int min = Integer.parseInt(input.substring(field.length() + 1).split(" TO ")[0]); int max = Integer.parseInt(input.substring(field.length() + 1).split(" TO ")[1]); // 设置搜索条件 myresponse = responsebuilder .setQuery(QueryBuilders.rangeQuery(field).from(min).to(max)) // 范围查询 6 .setFrom(start).setSize(size).execute().actionGet(); break; case 7: /** * text:field */ // 提取字段 field = input.split(":")[0]; text = input.substring(field.length() + 1); // 设置搜索条件 myresponse = responsebuilder.setQuery(QueryBuilders.regexpQuery(field, text)) // 正则查询 7 .setFrom(start).setSize(size).execute().actionGet(); break; } SearchHits hits = myresponse.getHits(); return hits; } /** * 返回集群名称 * * @return */ public String getClusterNameFromClust() { ClusterHealthResponse healths = client.admin().cluster().prepareHealth().get(); String clusterName = healths.getClusterName(); // int numberOfDataNodes = healths.getNumberOfDataNodes(); // int numberOfNodes = healths.getNumberOfNodes(); // String clusterHealthStatu = null; // for (ClusterIndexHealth health : healths.getIndices().values()) { // String index = health.getIndex(); // ClusterHealthStatus status = health.getStatus(); // clusterHealthStatu = status.toString(); // } // System.out.println("111" + clusterName); // System.out.println("222" + numberOfDataNodes); // System.out.println("333" + numberOfNodes); return clusterName; } /** * 返回集群健康度 * * @return */ public String getClusterHealthStatuFromClust() { ClusterHealthResponse healths = client.admin().cluster().prepareHealth().get(); String clusterHealthStatu = null; for (ClusterIndexHealth health : healths.getIndices().values()) { ClusterHealthStatus status = health.getStatus(); clusterHealthStatu = status.toString(); } return clusterHealthStatu; } /** * 返回集群dataNode个数 * * @return */ public int getNumberOfDataNodeFromClust() { ClusterHealthResponse healths = client.admin().cluster().prepareHealth().get(); int numberOfDataNodes = healths.getNumberOfDataNodes(); return numberOfDataNodes; } /** * 返回集群节点数 * * @return */ public int getNumberOfNodeFromClust() { ClusterHealthResponse healths = client.admin().cluster().prepareHealth().get(); int numberOfNodes = healths.getNumberOfNodes(); return numberOfNodes; } /** * 关闭连接. */ public void close() { client.close(); } /** * 测试 * * @param args * @throws IOException * @throws TRSException */ public static void main(String[] args) throws IOException, TRSException { Map kv = new HashMap (); kv.put("content", "string,standard"); kv.put("title", "text,simple"); kv.put("content", "string,whitespace"); kv.put("title", "text,language"); ESClient esClient = new ESClient(); esClient.init("127.0.0.1", 9300, "elasticsearch"); // esClient.delIndex("shishi"); // esClient.createIndex("shishi", 1, 1, kv); esClient.setShardCount("demo", 10); System.out.println(esClient.getAllShardCount("demo")); // esClient.getMapping("demo", "type"); // esClient.createIndex("shishi", 1, 1); // esClient.getMapping("shishi", "type");//获取mapping // esClient.setRepliceCountCount("shishi", 2); // System.out.println(esClient.getAllReplace("shishi"));// 查看副本和分页的数量 // System.out.println(esClient.getAllShardCount("shishi"));// 查看副本和分页的数量 // esClient.delIndex("shishi"); // esClient.createIndex("shishi", 1, 1, kv); // esClient.updateMapping("shishi", "type", kv); // System.out.print(esClient.indexExists("demo")); // System.out.print(esClient.recordCount("demo")); // System.out.println(esClient.getSuccessfulShardCount("website_formal_20160612_disaster")); // esClient.setShardCount("demo", 10); // esClient.delRecordById("shishi", "type", "2"); // esClient.delIndex("shishi"); // esClient.createIndex("shishi", 1, 1); esClient.close(); } }