pom.xml(maven配置):
<!-- Java High Level REST Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- elasticsearch搜索引擎 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</dependency>
application.yml:
# elasticsearch-head(查看工具):http://127.0.0.1:9100/
elasticsearch:
enabled: true
server: 127.0.0.1:9200
connectionTimeoutMs: 1000
socketTimeoutMs: 30000
connectionRequestTimeoutMs: 500
maxConnTotal: 100
maxConnPerRoute: 100
java工具类:
@Configuration
@EnableConfigurationProperties({EsProperties.class})
@ConditionalOnProperty( //配置文件属性是否为true
value = {"elasticsearch.enabled"},
matchIfMissing = false
)
public class EsConfig {
@Bean(initMethod = "init",destroyMethod="stop")
public EsClient esClient(EsProperties esProperties) {
EsClient esClient = new EsClient(esProperties);
return esClient;
}
}
@Slf4j
public class EsClient {
private RestHighLevelClient restHighLevelClient;
private EsProperties esProperties;
public EsClient(EsProperties esProperties) {
this.esProperties=esProperties;
}
/**
* 初始化es客户端
*/
public void init() {
try{
RestClientBuilder builder=RestClient.builder(makeHttpHost(esProperties.getServer()));
//异步httpclient的连接延时配置
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
requestConfigBuilder.setConnectTimeout(esProperties.getConnectionTimeoutMs());
requestConfigBuilder.setSocketTimeout(esProperties.getSocketTimeoutMs());
requestConfigBuilder.setConnectionRequestTimeout(esProperties.getConnectionRequestTimeoutMs());
return requestConfigBuilder;
}
});
//异步httpclient的连接数配置
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setMaxConnTotal(esProperties.getMaxConnTotal());
httpClientBuilder.setMaxConnPerRoute(esProperties.getMaxConnPerRoute());
return httpClientBuilder;
}
});
restHighLevelClient=new RestHighLevelClient(builder);
}catch(Exception e){
log.error("初始化es客户端出错-{}",e);
}
}
public void stop() {
try {
restHighLevelClient.close();
} catch (IOException e) {
log.error("es停止失败:{}",e);
}
}
private HttpHost makeHttpHost(String s) {
String[] address = s.split(":");
String ip = address[0];
int port = Integer.parseInt(address[1]);
return new HttpHost(ip, port, "http");
}
public RestHighLevelClient getClient() {
return restHighLevelClient;
}
public boolean createIndex(String index) {
boolean acknowledged = false;
try {
CreateIndexRequest request = new CreateIndexRequest(index);
//设置分片和副本数
// request.settings(Settings.builder()
// .put("index.number_of_shards", 5)
// .put("index.number_of_replicas", 1)
// );
CreateIndexResponse response = restHighLevelClient.indices().create(request,RequestOptions.DEFAULT);
log.info("索引创建成功-是否所有节点都已确认请求-{}",response.isAcknowledged());
acknowledged=response.isAcknowledged();
} catch (IOException e) {
log.error("索引索引失败:{}", e);
}
return acknowledged;
}
public boolean createIndex(String index,Builder settings, String mappings,String alias) {
boolean acknowledged = false;
try{
if(existsIndex(index)) {
log.warn("Index is exits!");
return false;
}
CreateIndexRequest request = new CreateIndexRequest(index);
if(settings!=null)request.settings(settings);;
if(StringUtils.isNotBlank(mappings))buildIndexMapping(request, mappings);
// 为索引设置一个别名
if(StringUtils.isNotBlank(alias))request.alias(new Alias(alias));
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
log.info("索引创建成功-是否所有节点都已确认请求-{}",response.isAcknowledged());
acknowledged=response.isAcknowledged();
}catch (Exception e){
log.error("索引创建失败:{}", e);
}
return acknowledged;
}
/**
* 设置分片
* @param request
*/
private void buildSetting(CreateIndexRequest request, String settings) {
request.settings(settings, XContentType.JSON);
}
/**
* 设置索引的mapping
* @param request
*/
private void buildIndexMapping(CreateIndexRequest request, String mappings) {
request.mapping(mappings, XContentType.JSON);
}
/**
* 删除索引
*/
public boolean deleteIndex(String index) {
boolean acknowledged = false;
try {
if(!existsIndex(index)) {
log.warn("Index is not exits!");
return false;
}
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
acknowledged = delete.isAcknowledged();
} catch (IOException e) {
log.error("删除索引失败:{}", e);
}
return acknowledged;
}
/**
* 判断索引是否存在
*/
public boolean existsIndex(String...index) {
boolean exists = false;
try{
GetIndexRequest request = new GetIndexRequest(index);
exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
}catch (Exception e){
log.error("未知错误:{}", e);
}
return exists;
}
/**
* 打开索引
*/
public boolean openIndex(String index) {
boolean acknowledged = false;
try {
if(!existsIndex(index)) {
log.warn("Index is not exits!");
return false;
}
OpenIndexRequest request = new OpenIndexRequest(index);
OpenIndexResponse openIndexResponse = restHighLevelClient.indices().open(request, RequestOptions.DEFAULT);
acknowledged = openIndexResponse.isAcknowledged();
log.info("openIndex:{}",acknowledged);
} catch (IOException e) {
log.error("openIndex:{}",e);
}
return acknowledged;
}
/**
* 关闭索引
*/
public boolean closeIndex(String index) {
boolean acknowledged = false;
try {
if(!existsIndex(index)) {
log.warn("Index is not exits!");
return false;
}
CloseIndexRequest request = new CloseIndexRequest(index);
CloseIndexResponse closeIndexResponse = restHighLevelClient.indices().close(request, RequestOptions.DEFAULT);
acknowledged = closeIndexResponse.isAcknowledged();
log.info("closeIndex:{}",acknowledged);
} catch (IOException e) {
log.info("closeIndex:{}",e);
}
return acknowledged;
}
// doc 操作#########################################################################
/**
* 新增文档-单条数据插入
*/
public boolean insertDoc(String index,String id,String jsonStr){
try {
IndexRequest indexRequest = new IndexRequest(index).id(id).source(jsonStr,XContentType.JSON);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("insertDoc:{}",indexResponse.getId());
} catch (IOException e) {
log.error("insertDoc失败:{}",e);
return false;
}
return true;
}
/**
* 删除文档
*/
public boolean deleteDoc(String index,String id) {
try {
if(!existsIndex(index)) {
log.warn("Index is not exits!");
return false;
}
DeleteRequest deleteRequest = new DeleteRequest(index);
deleteRequest.id(id);
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
log.info("文档删除成功");
}
} catch (IOException e) {
log.error("文档删除失败",e);
return false;
}
return true;
}
/**
* 更新文档
* doc内容-可以是JSON字符串,或者Map,或者XContentBuilder 对象
*/
public void updateDoc(String index,String id,String jsonStr) {
try {
if(!existsIndex(index)) {
log.warn("Index is not exits!");
return;
}
UpdateRequest request = new UpdateRequest().index(index).id(id).doc(jsonStr,XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
log.info("updateDoc:{}",updateResponse);
} catch (Exception e) {
log.error("updateDoc:{}",e);
}
}
/**
* 批量新增es
*/
public <T> void bulkInsert(String index,String idName,List<T> tList) {
try{
BulkRequest bulkRequest = new BulkRequest();
for (T t : tList) {
bulkRequest.add(new IndexRequest().index(index).id(BeanUtil.getProperty(t, idName).toString()).source(JSONUtil.toJsonStr(t),XContentType.JSON));
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
log.error("批量操作失败:{}",failure.getMessage());
continue;
}
}
}catch (Exception e){
log.error("批量插入索引失败:{}", e);
}
}
/**
* 批量删除es
*/
public <T> void bulkDel(String index,List<String> idList) {
try{
if(!existsIndex(index)) {
log.warn("Index is not exits!");
return;
}
BulkRequest bulkRequest = new BulkRequest();
for (String id : idList) {
bulkRequest.add(new DeleteRequest().index(index).id(id));
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
log.error("批量操作失败:{}",failure.getMessage());
continue;
}
}
}catch (Exception e){
log.error("批量插入索引失败:{}", e);
}
}
/**
* 批量更新es
*/
public <T> void bulkUpdate(String index,String idName,List<T> tList) {
try{
if(!existsIndex(index)) {
log.warn("Index is not exits!");
return;
}
BulkRequest bulkRequest = new BulkRequest();
for (T t : tList) {
bulkRequest.add(new UpdateRequest().index(index).id(BeanUtil.getProperty(t, idName).toString()).doc(JSONUtil.toJsonStr(t),XContentType.JSON));
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
log.error("批量操作失败:{}",failure.getMessage());
continue;
}
}
}catch (Exception e){
log.error("批量插入索引失败:{}", e);
}
}
/**
* 批量同步es
*/
public <T> void bulkDoc(String index,List<Map<String,Object>> mapList) {
try{
BulkRequest bulkRequest = new BulkRequest();
for (Map<String, Object> map : mapList) {
if("insert".equals(map.get("flag"))) {
bulkRequest.add(new IndexRequest().index(index).id(map.get("id").toString()).source(map.get("jsonStr"),XContentType.JSON));
}
if("delete".equals(map.get("flag"))) {
bulkRequest.add(new DeleteRequest().index(index).id(map.get("id").toString()));
}
if("update".equals(map.get("flag"))) {
bulkRequest.add(new UpdateRequest().index(index).id(map.get("id").toString()).doc(map.get("jsonStr"),XContentType.JSON));
}
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
log.error("批量操作失败:{}",failure.getMessage());
continue;
}
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
}catch (Exception e){
log.error("批量插入索引失败:{}", e);
}
}
/**
* 查询文档
*/
public void queryDoc(String index,String id) throws IOException {
GetRequest getRequest = new GetRequest();
getRequest.index(index);
getRequest.id(id);
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
log.info("queryDoc:{}",getResponse.getSourceAsString());
}
/**
* must和should 分别对应sql语句里面的 and 和 or
*
* 中文包含的搜索,类似于sql中的 like "%keyword%" ,不分词的前提
* QueryBuilders.matchPhraseQuery
*
* 精确搜索文档
* 例:QueryBuilders.boolQuery().
should(QueryBuilders.termQuery("context", "fox").queryName("q1")).
should(QueryBuilders.termQuery("context", "brown").queryName("q2"))
模糊查询
QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
.fuzziness(Fuzziness.AUTO)
.prefixLength(3)
.maxExpansions(10);
*/
public <T> List<T> searchDoc(String[] index,QueryBuilder queryBuilder,Class<T> clz){
//查询多个文档库,其中多个文档库名之间用逗号隔开
List<T> searchList=new ArrayList<T>();
try {
if(!existsIndex(index)) {
log.warn("Index is not exits!");
return searchList;
}
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(queryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits(); //SearchHits提供有关所有匹配的全局信息,例如总命中数或最高分数:
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit: searchHits) {
log.info("searchDoc:{}",hit.getSourceAsString());
}
hits.forEach(item -> searchList.add(JSONUtil.toBean(item.getSourceAsString(), clz)));
} catch (IOException e) {
log.error("searchDoc:{}",e);
}
return searchList;
}
public <T> TableDataInfo listPage(String[] index,PageDomain pageDomain,QueryBuilder queryBuilder,Class<T> clz) throws Exception{
//创建检索请求
SearchRequest searchRequest = new SearchRequest(index);
//创建搜索构建者
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//设置构建搜索属性
sourceBuilder.from(pageDomain.getPageNum() * pageDomain.getPageSize()); // 需要注意这里是从多少条开始
sourceBuilder.size(pageDomain.getPageSize()); //共返回多少条数据
sourceBuilder.sort(new FieldSortBuilder(pageDomain.getOrderByColumn()).order(SortOrder.ASC)); //根据自己的需求排序
if(queryBuilder!=null) {
sourceBuilder.query(queryBuilder);
}
//传入构建进行搜索
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//处理结果
RestStatus restStatus = searchResponse.status();
if (restStatus != RestStatus.OK){
throw new CustomException("搜索错误");
}
List<T> list = new ArrayList<T>();
SearchHits hits = searchResponse.getHits();
hits.forEach(item -> list.add(JSONUtil.toBean(item.getSourceAsString(), clz)));
TableDataInfo rspData = new TableDataInfo();
rspData.setCode(HttpStatus.SUCCESS);
rspData.setMsg("查询成功");
rspData.setRows(list);
rspData.setTotal(new PageInfo(list).getTotal());
return rspData;
}
}
工具调用:
@Autowired private EsClient esClient;
搜索:
List<Map> mapDataList=esClient.searchDoc(new String[] {index1,index2}, matchQueryBuilder, Map.class);
批量插入:
Map<String, Object> jsonMap = new HashMap<>();
Map<String, Object> properties = new HashMap<>();
Map<String, Object> name = new HashMap<>();
name.put("type", "text");
name.put("analyzer", "ik_max_word");
name.put("search_analyzer", "ik_smart");
properties.put("name", name);
jsonMap.put("properties", properties);
esClient.createIndex(batchIndex, Settings.builder()
.put("index.number_of_shards",1)
.put("index.number_of_replicas",1),JSONUtil.toJsonStr(jsonMap),null);
esClient.bulkInsert(batchIndex, "id", list);//"id"-对应list里面bean的主键
单个插入:
JSONObject jsonObject=JSONUtil.parseObj(kfGoodsInfo);
jsonObject.put("type", BussinessConstant.SERVICE_TYPE_2);
esClient.insertDoc(insertIndex,id, jsonObject.toString());
更新:
JSONObject jsonObject=JSONUtil.parseObj(kfGoodsInfo);
jsonObject.put("type", BussinessConstant.SERVICE_TYPE_2);
esClient.updateDoc(updateIndex,id, jsonObject.toString());