当前位置 : 主页 > 编程语言 > java >

elasticsearch+springboot实现客户端搜索

来源:互联网 收集:自由互联 发布时间:2022-09-02
pom.xml(maven配置): !-- Java High Level REST Client -- dependency groupIdorg.elasticsearch.client/groupId artifactIdelasticsearch-rest-high-level-client/artifactId exclusions exclusion groupIdorg.elasticsearch.client/groupId artifactIdela

pom.xml(maven配置):

<!-- Java High Level REST Client -->

<dependency>

<groupId>org.elasticsearch.client</groupId>

<artifactId>elasticsearch-rest-high-level-client</artifactId>

<exclusions>

<exclusion>

<groupId>org.elasticsearch.client</groupId>

<artifactId>elasticsearch-rest-client</artifactId>

</exclusion>

<exclusion>

<groupId>org.elasticsearch</groupId>

<artifactId>elasticsearch</artifactId>

</exclusion>

</exclusions>

</dependency>

<!-- elasticsearch搜索引擎 -->

<dependency>

<groupId>org.elasticsearch</groupId>

<artifactId>elasticsearch</artifactId>

</dependency>

<dependency>

<groupId>org.elasticsearch.client</groupId>

<artifactId>elasticsearch-rest-client</artifactId>

</dependency>


application.yml:


# elasticsearch-head(查看工具):​​http://127.0.0.1:9100/​​

elasticsearch:

enabled: true

server: 127.0.0.1:9200

connectionTimeoutMs: 1000

socketTimeoutMs: 30000

connectionRequestTimeoutMs: 500

maxConnTotal: 100

maxConnPerRoute: 100


java工具类:

@Configuration

@EnableConfigurationProperties({EsProperties.class})

@ConditionalOnProperty( //配置文件属性是否为true

value = {"elasticsearch.enabled"},

matchIfMissing = false

)

public class EsConfig {

@Bean(initMethod = "init",destroyMethod="stop")

public EsClient esClient(EsProperties esProperties) {

EsClient esClient = new EsClient(esProperties);

return esClient;

}

}


@Slf4j

public class EsClient {

private RestHighLevelClient restHighLevelClient;

private EsProperties esProperties;

public EsClient(EsProperties esProperties) {

this.esProperties=esProperties;

}

/**

* 初始化es客户端

*/

public void init() {

try{

RestClientBuilder builder=RestClient.builder(makeHttpHost(esProperties.getServer()));

//异步httpclient的连接延时配置

builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {

@Override

public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {

requestConfigBuilder.setConnectTimeout(esProperties.getConnectionTimeoutMs());

requestConfigBuilder.setSocketTimeout(esProperties.getSocketTimeoutMs());

requestConfigBuilder.setConnectionRequestTimeout(esProperties.getConnectionRequestTimeoutMs());

return requestConfigBuilder;

}

});

//异步httpclient的连接数配置

builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {

@Override

public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

httpClientBuilder.setMaxConnTotal(esProperties.getMaxConnTotal());

httpClientBuilder.setMaxConnPerRoute(esProperties.getMaxConnPerRoute());

return httpClientBuilder;

}

});

restHighLevelClient=new RestHighLevelClient(builder);

}catch(Exception e){

log.error("初始化es客户端出错-{}",e);

}

}

public void stop() {

try {

restHighLevelClient.close();

} catch (IOException e) {

log.error("es停止失败:{}",e);

}

}

private HttpHost makeHttpHost(String s) {

String[] address = s.split(":");

String ip = address[0];

int port = Integer.parseInt(address[1]);

return new HttpHost(ip, port, "http");

}

public RestHighLevelClient getClient() {

return restHighLevelClient;

}

public boolean createIndex(String index) {

boolean acknowledged = false;

try {

CreateIndexRequest request = new CreateIndexRequest(index);

//设置分片和副本数

// request.settings(Settings.builder()

// .put("index.number_of_shards", 5)

// .put("index.number_of_replicas", 1)

// );

CreateIndexResponse response = restHighLevelClient.indices().create(request,RequestOptions.DEFAULT);

​​log.info​​("索引创建成功-是否所有节点都已确认请求-{}",response.isAcknowledged());

acknowledged=response.isAcknowledged();

} catch (IOException e) {

log.error("索引索引失败:{}", e);

}

return acknowledged;

}

public boolean createIndex(String index,Builder settings, String mappings,String alias) {

boolean acknowledged = false;

try{

if(existsIndex(index)) {

log.warn("Index is exits!");

return false;

}

CreateIndexRequest request = new CreateIndexRequest(index);

if(settings!=null)request.settings(settings);;

if(StringUtils.isNotBlank(mappings))buildIndexMapping(request, mappings);

// 为索引设置一个别名

if(StringUtils.isNotBlank(alias))request.alias(new Alias(alias));

CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);

​​log.info​​("索引创建成功-是否所有节点都已确认请求-{}",response.isAcknowledged());

acknowledged=response.isAcknowledged();

}catch (Exception e){

log.error("索引创建失败:{}", e);

}

return acknowledged;

}

/**

* 设置分片

* @param request

*/

private void buildSetting(CreateIndexRequest request, String settings) {

request.settings(settings, XContentType.JSON);

}

/**

* 设置索引的mapping

* @param request

*/

private void buildIndexMapping(CreateIndexRequest request, String mappings) {

request.mapping(mappings, XContentType.JSON);

}

/**

* 删除索引

*/

public boolean deleteIndex(String index) {

boolean acknowledged = false;

try {

if(!existsIndex(index)) {

log.warn("Index is not exits!");

return false;

}

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);

deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);

AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);

acknowledged = delete.isAcknowledged();

} catch (IOException e) {

log.error("删除索引失败:{}", e);

}

return acknowledged;

}

/**

* 判断索引是否存在

*/

public boolean existsIndex(String...index) {

boolean exists = false;

try{

GetIndexRequest request = new GetIndexRequest(index);

exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);

}catch (Exception e){

log.error("未知错误:{}", e);

}

return exists;

}

/**

* 打开索引

*/

public boolean openIndex(String index) {

boolean acknowledged = false;

try {

if(!existsIndex(index)) {

log.warn("Index is not exits!");

return false;

}

OpenIndexRequest request = new OpenIndexRequest(index);

OpenIndexResponse openIndexResponse = restHighLevelClient.indices().open(request, RequestOptions.DEFAULT);

acknowledged = openIndexResponse.isAcknowledged();

​​log.info​​("openIndex:{}",acknowledged);

} catch (IOException e) {

log.error("openIndex:{}",e);

}

return acknowledged;

}

/**

* 关闭索引

*/

public boolean closeIndex(String index) {

boolean acknowledged = false;

try {

if(!existsIndex(index)) {

log.warn("Index is not exits!");

return false;

}

CloseIndexRequest request = new CloseIndexRequest(index);

CloseIndexResponse closeIndexResponse = restHighLevelClient.indices().close(request, RequestOptions.DEFAULT);

acknowledged = closeIndexResponse.isAcknowledged();

​​log.info​​("closeIndex:{}",acknowledged);

} catch (IOException e) {

​​log.info​​("closeIndex:{}",e);

}

return acknowledged;

}

// doc 操作#########################################################################

/**

* 新增文档-单条数据插入

*/

public boolean insertDoc(String index,String id,String jsonStr){

try {

IndexRequest indexRequest = new IndexRequest(index).id(id).source(jsonStr,XContentType.JSON);

IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);

​​log.info​​("insertDoc:{}",indexResponse.getId());

} catch (IOException e) {

log.error("insertDoc失败:{}",e);

return false;

}

return true;

}

/**

* 删除文档

*/

public boolean deleteDoc(String index,String id) {

try {

if(!existsIndex(index)) {

log.warn("Index is not exits!");

return false;

}

DeleteRequest deleteRequest = new DeleteRequest(index);

​​deleteRequest.id​​(id);

DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);

if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {

​​log.info​​("文档删除成功");

}

} catch (IOException e) {

log.error("文档删除失败",e);

return false;

}

return true;

}

/**

* 更新文档

* doc内容-可以是JSON字符串,或者Map,或者XContentBuilder 对象

*/

public void updateDoc(String index,String id,String jsonStr) {

try {

if(!existsIndex(index)) {

log.warn("Index is not exits!");

return;

}

UpdateRequest request = new UpdateRequest().index(index).id(id).doc(jsonStr,XContentType.JSON);

UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);

​​log.info​​("updateDoc:{}",updateResponse);

} catch (Exception e) {

log.error("updateDoc:{}",e);

}

}

/**

* 批量新增es

*/

public <T> void bulkInsert(String index,String idName,List<T> tList) {

try{

BulkRequest bulkRequest = new BulkRequest();

for (T t : tList) {

bulkRequest.add(new IndexRequest().index(index).id(BeanUtil.getProperty(t, idName).toString()).source(JSONUtil.toJsonStr(t),XContentType.JSON));

}

BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

for (BulkItemResponse bulkItemResponse : bulkResponse) {

if (bulkItemResponse.isFailed()) {

BulkItemResponse.Failure failure = bulkItemResponse.getFailure();

log.error("批量操作失败:{}",failure.getMessage());

continue;

}

}

}catch (Exception e){

log.error("批量插入索引失败:{}", e);

}

}

/**

* 批量删除es

*/

public <T> void bulkDel(String index,List<String> idList) {

try{

if(!existsIndex(index)) {

log.warn("Index is not exits!");

return;

}

BulkRequest bulkRequest = new BulkRequest();

for (String id : idList) {

bulkRequest.add(new DeleteRequest().index(index).id(id));

}

BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

for (BulkItemResponse bulkItemResponse : bulkResponse) {

if (bulkItemResponse.isFailed()) {

BulkItemResponse.Failure failure = bulkItemResponse.getFailure();

log.error("批量操作失败:{}",failure.getMessage());

continue;

}

}

}catch (Exception e){

log.error("批量插入索引失败:{}", e);

}

}

/**

* 批量更新es

*/

public <T> void bulkUpdate(String index,String idName,List<T> tList) {

try{

if(!existsIndex(index)) {

log.warn("Index is not exits!");

return;

}

BulkRequest bulkRequest = new BulkRequest();

for (T t : tList) {

bulkRequest.add(new UpdateRequest().index(index).id(BeanUtil.getProperty(t, idName).toString()).doc(JSONUtil.toJsonStr(t),XContentType.JSON));

}

BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

for (BulkItemResponse bulkItemResponse : bulkResponse) {

if (bulkItemResponse.isFailed()) {

BulkItemResponse.Failure failure = bulkItemResponse.getFailure();

log.error("批量操作失败:{}",failure.getMessage());

continue;

}

}

}catch (Exception e){

log.error("批量插入索引失败:{}", e);

}

}

/**

* 批量同步es

*/

public <T> void bulkDoc(String index,List<Map<String,Object>> mapList) {

try{

BulkRequest bulkRequest = new BulkRequest();

for (Map<String, Object> map : mapList) {

if("insert".equals(map.get("flag"))) {

bulkRequest.add(new IndexRequest().index(index).id(map.get("id").toString()).source(map.get("jsonStr"),XContentType.JSON));

}

if("delete".equals(map.get("flag"))) {

bulkRequest.add(new DeleteRequest().index(index).id(map.get("id").toString()));

}

if("update".equals(map.get("flag"))) {

bulkRequest.add(new UpdateRequest().index(index).id(map.get("id").toString()).doc(map.get("jsonStr"),XContentType.JSON));

}

}

BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

for (BulkItemResponse bulkItemResponse : bulkResponse) {

if (bulkItemResponse.isFailed()) {

BulkItemResponse.Failure failure = bulkItemResponse.getFailure();

log.error("批量操作失败:{}",failure.getMessage());

continue;

}

DocWriteResponse itemResponse = bulkItemResponse.getResponse();

if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX

|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {

IndexResponse indexResponse = (IndexResponse) itemResponse;

} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {

UpdateResponse updateResponse = (UpdateResponse) itemResponse;

} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {

DeleteResponse deleteResponse = (DeleteResponse) itemResponse;

}

}

}catch (Exception e){

log.error("批量插入索引失败:{}", e);

}

}

/**

* 查询文档

*/

public void queryDoc(String index,String id) throws IOException {

GetRequest getRequest = new GetRequest();

getRequest.index(index);

​​getRequest.id​​(id);

GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);

​​log.info​​("queryDoc:{}",getResponse.getSourceAsString());

}

/**

* must和should 分别对应sql语句里面的 and 和 or

*

* 中文包含的搜索,类似于sql中的 like "%keyword%" ,不分词的前提

* QueryBuilders.matchPhraseQuery

*

* 精确搜索文档

* 例:QueryBuilders.boolQuery().

should(QueryBuilders.termQuery("context", "fox").queryName("q1")).

should(QueryBuilders.termQuery("context", "brown").queryName("q2"))

模糊查询

QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")

.fuzziness(​​Fuzziness.AUTO​​)

.prefixLength(3)

.maxExpansions(10);

*/

public <T> List<T> searchDoc(String[] index,QueryBuilder queryBuilder,Class<T> clz){

//查询多个文档库,其中多个文档库名之间用逗号隔开

List<T> searchList=new ArrayList<T>();

try {

if(!existsIndex(index)) {

log.warn("Index is not exits!");

return searchList;

}

SearchRequest searchRequest = new SearchRequest(index);

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

sourceBuilder.query(queryBuilder);

searchRequest.source(sourceBuilder);

SearchResponse searchResponse = ​​restHighLevelClient.search​​(searchRequest, RequestOptions.DEFAULT);

SearchHits hits = searchResponse.getHits(); //SearchHits提供有关所有匹配的全局信息,例如总命中数或最高分数:

SearchHit[] searchHits = hits.getHits();

for (SearchHit hit: searchHits) {

​​log.info​​("searchDoc:{}",hit.getSourceAsString());

}

hits.forEach(item -> searchList.add(JSONUtil.toBean(item.getSourceAsString(), clz)));

} catch (IOException e) {

log.error("searchDoc:{}",e);

}

return searchList;

}

public <T> TableDataInfo listPage(String[] index,PageDomain pageDomain,QueryBuilder queryBuilder,Class<T> clz) throws Exception{

//创建检索请求

SearchRequest searchRequest = new SearchRequest(index);

//创建搜索构建者

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

//设置构建搜索属性

sourceBuilder.from(pageDomain.getPageNum() * pageDomain.getPageSize()); // 需要注意这里是从多少条开始

sourceBuilder.size(pageDomain.getPageSize()); //共返回多少条数据

sourceBuilder.sort(new FieldSortBuilder(pageDomain.getOrderByColumn()).order(SortOrder.ASC)); //根据自己的需求排序

if(queryBuilder!=null) {

sourceBuilder.query(queryBuilder);

}

//传入构建进行搜索

searchRequest.source(sourceBuilder);

SearchResponse searchResponse = ​​restHighLevelClient.search​​(searchRequest, RequestOptions.DEFAULT);

//处理结果

RestStatus restStatus = searchResponse.status();

if (restStatus != RestStatus.OK){

throw new CustomException("搜索错误");

}

List<T> list = new ArrayList<T>();

SearchHits hits = searchResponse.getHits();

hits.forEach(item -> list.add(JSONUtil.toBean(item.getSourceAsString(), clz)));

TableDataInfo rspData = new TableDataInfo();

rspData.setCode(HttpStatus.SUCCESS);

rspData.setMsg("查询成功");

rspData.setRows(list);

rspData.setTotal(new PageInfo(list).getTotal());

return rspData;

}

}


工具调用:

@Autowired private EsClient esClient;

搜索:

List<Map> mapDataList=esClient.searchDoc(new String[] {index1,index2}, matchQueryBuilder, Map.class);


批量插入:

Map<String, Object> jsonMap = new HashMap<>();

Map<String, Object> properties = new HashMap<>();

Map<String, Object> name = new HashMap<>();

name.put("type", "text");

name.put("analyzer", "ik_max_word");

name.put("search_analyzer", "ik_smart");

properties.put("name", name);

jsonMap.put("properties", properties);

esClient.createIndex(batchIndex, Settings.builder()

.put("index.number_of_shards",1)

.put("index.number_of_replicas",1),JSONUtil.toJsonStr(jsonMap),null);

esClient.bulkInsert(batchIndex, "id", list);//"id"-对应list里面bean的主键


单个插入:

JSONObject jsonObject=JSONUtil.parseObj(kfGoodsInfo);

jsonObject.put("type", BussinessConstant.SERVICE_TYPE_2);

esClient.insertDoc(insertIndex,id, jsonObject.toString());


更新:


JSONObject jsonObject=JSONUtil.parseObj(kfGoodsInfo);

jsonObject.put("type", BussinessConstant.SERVICE_TYPE_2);

esClient.updateDoc(updateIndex,id, jsonObject.toString());

网友评论